Merge pull request #1882 from murgatroid99/grpc-js-xds_v2_message_handling

grpc-js-xds: Distinguish v2 and v3 when handling messages
This commit is contained in:
Michael Lumish 2021-08-19 13:50:30 -07:00 committed by GitHub
commit 087bc3ee5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 72 additions and 38 deletions

View File

@ -212,6 +212,7 @@ class XdsResolver implements Resolver {
private latestRouteConfigName: string | null = null;
private latestRouteConfig: RouteConfiguration__Output | null = null;
private latestRouteConfigIsV2 = false;
private clusterRefcounts = new Map<string, {inLastConfig: boolean, refCount: number}>();
@ -225,7 +226,7 @@ class XdsResolver implements Resolver {
private channelOptions: ChannelOptions
) {
this.ldsWatcher = {
onValidUpdate: (update: Listener__Output) => {
onValidUpdate: (update: Listener__Output, isV2: boolean) => {
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, update.api_listener!.api_listener!.value);
const defaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout;
if (defaultTimeout === null || defaultTimeout === undefined) {
@ -233,7 +234,7 @@ class XdsResolver implements Resolver {
} else {
this.latestDefaultTimeout = protoDurationToDuration(defaultTimeout);
}
if (EXPERIMENTAL_FAULT_INJECTION) {
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
this.ldsHttpFilterConfigs = [];
for (const filter of httpConnectionManager.http_filters) {
// typed_config must be set here, or validation would have failed
@ -259,7 +260,7 @@ class XdsResolver implements Resolver {
if (this.latestRouteConfigName) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
this.handleRouteConfig(httpConnectionManager.route_config!);
this.handleRouteConfig(httpConnectionManager.route_config!, isV2);
break;
default:
// This is prevented by the validation rules
@ -279,8 +280,8 @@ class XdsResolver implements Resolver {
}
};
this.rdsWatcher = {
onValidUpdate: (update: RouteConfiguration__Output) => {
this.handleRouteConfig(update);
onValidUpdate: (update: RouteConfiguration__Output, isV2: boolean) => {
this.handleRouteConfig(update, isV2);
},
onTransientError: (error: StatusObject) => {
/* A transient error only needs to bubble up as a failure if we have
@ -310,20 +311,21 @@ class XdsResolver implements Resolver {
refCount.refCount -= 1;
if (!refCount.inLastConfig && refCount.refCount === 0) {
this.clusterRefcounts.delete(clusterName);
this.handleRouteConfig(this.latestRouteConfig!);
this.handleRouteConfig(this.latestRouteConfig!, this.latestRouteConfigIsV2);
}
}
}
private handleRouteConfig(routeConfig: RouteConfiguration__Output) {
private handleRouteConfig(routeConfig: RouteConfiguration__Output, isV2: boolean) {
this.latestRouteConfig = routeConfig;
this.latestRouteConfigIsV2 = isV2;
const virtualHost = findVirtualHostForDomain(routeConfig.virtual_hosts, this.target.path);
if (virtualHost === null) {
this.reportResolutionError('No matching route found');
return;
}
const virtualHostHttpFilterOverrides = new Map<string, HttpFilterConfig>();
if (EXPERIMENTAL_FAULT_INJECTION) {
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
for (const [name, filter] of Object.entries(virtualHost.typed_per_filter_config ?? {})) {
const parsedConfig = parseOverrideFilterConfig(filter);
if (parsedConfig) {
@ -352,7 +354,7 @@ class XdsResolver implements Resolver {
timeout = undefined;
}
const routeHttpFilterOverrides = new Map<string, HttpFilterConfig>();
if (EXPERIMENTAL_FAULT_INJECTION) {
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
for (const [name, filter] of Object.entries(route.typed_per_filter_config ?? {})) {
const parsedConfig = parseOverrideFilterConfig(filter);
if (parsedConfig) {
@ -367,7 +369,7 @@ class XdsResolver implements Resolver {
const cluster = route.route!.cluster!;
allConfigClusters.add(cluster);
const extraFilterFactories: FilterFactory<Filter>[] = [];
if (EXPERIMENTAL_FAULT_INJECTION) {
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
for (const filterConfig of this.ldsHttpFilterConfigs) {
if (routeHttpFilterOverrides.has(filterConfig.name)) {
const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!);
@ -396,7 +398,7 @@ class XdsResolver implements Resolver {
allConfigClusters.add(clusterWeight.name);
const extraFilterFactories: FilterFactory<Filter>[] = [];
const clusterHttpFilterOverrides = new Map<string, HttpFilterConfig>();
if (EXPERIMENTAL_FAULT_INJECTION) {
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
for (const [name, filter] of Object.entries(clusterWeight.typed_per_filter_config ?? {})) {
const parsedConfig = parseOverrideFilterConfig(filter);
if (parsedConfig) {

View File

@ -435,32 +435,47 @@ export class XdsClient {
private handleAdsResponse(message: DiscoveryResponse__Output) {
let errorString: string | null;
let serviceKind: AdsServiceKind;
let isV2: boolean;
switch (message.type_url) {
case EDS_TYPE_URL_V2:
case CDS_TYPE_URL_V2:
case RDS_TYPE_URL_V2:
case LDS_TYPE_URL_V2:
isV2 = true;
break;
default:
isV2 = false;
}
switch (message.type_url) {
case EDS_TYPE_URL_V2:
case EDS_TYPE_URL_V3:
errorString = this.adsState.eds.handleResponses(
getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources)
getResponseMessages(EDS_TYPE_URL_V3, [EDS_TYPE_URL_V2, EDS_TYPE_URL_V3], message.resources),
isV2
);
serviceKind = 'eds';
break;
case CDS_TYPE_URL_V2:
case CDS_TYPE_URL_V3:
errorString = this.adsState.cds.handleResponses(
getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources)
getResponseMessages(CDS_TYPE_URL_V3, [CDS_TYPE_URL_V2, CDS_TYPE_URL_V3], message.resources),
isV2
);
serviceKind = 'cds';
break;
case RDS_TYPE_URL_V2:
case RDS_TYPE_URL_V3:
errorString = this.adsState.rds.handleResponses(
getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources)
getResponseMessages(RDS_TYPE_URL_V3, [RDS_TYPE_URL_V2, RDS_TYPE_URL_V3], message.resources),
isV2
);
serviceKind = 'rds';
break;
case LDS_TYPE_URL_V2:
case LDS_TYPE_URL_V3:
errorString = this.adsState.lds.handleResponses(
getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources)
getResponseMessages(LDS_TYPE_URL_V3, [LDS_TYPE_URL_V2, LDS_TYPE_URL_V3], message.resources),
isV2
);
serviceKind = 'lds';
break;

View File

@ -36,6 +36,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
>();
private latestResponses: Cluster__Output[] = [];
private latestIsV2 = false;
constructor(
private edsState: EdsState,
@ -61,13 +62,14 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.name === clusterName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
watcher.onValidUpdate(message);
watcher.onValidUpdate(message, isV2);
});
}
}
@ -134,7 +136,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
}
}
handleResponses(responses: Cluster__Output[]): string | null {
handleResponses(responses: Cluster__Output[], isV2: boolean): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('CDS validation failed for message ' + JSON.stringify(message));
@ -142,6 +144,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
}
}
this.latestResponses = responses;
this.latestIsV2 = isV2;
const allEdsServiceNames: Set<string> = new Set<string>();
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
@ -152,7 +155,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
watcher.onValidUpdate(message, isV2);
}
}
trace('Received CDS updates for cluster names ' + Array.from(allClusterNames));

View File

@ -36,6 +36,7 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();
private latestResponses: ClusterLoadAssignment__Output[] = [];
private latestIsV2 = false;
constructor(private updateResourceNames: () => void) {}
@ -61,13 +62,14 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
watcher.onValidUpdate(message);
watcher.onValidUpdate(message, isV2);
});
}
}
@ -143,7 +145,7 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
}
}
handleResponses(responses: ClusterLoadAssignment__Output[]) {
handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) {
for (const message of responses) {
if (!this.validateResponse(message)) {
trace('EDS validation failed for message ' + JSON.stringify(message));
@ -151,12 +153,13 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
}
}
this.latestResponses = responses;
this.latestIsV2 = isV2;
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
watcher.onValidUpdate(message, isV2);
}
}
trace('Received EDS updates for cluster names ' + Array.from(allClusterNames));

View File

@ -39,6 +39,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
private watchers: Map<string, Watcher<Listener__Output>[]> = new Map<string, Watcher<Listener__Output>[]>();
private latestResponses: Listener__Output[] = [];
private latestIsV2 = false;
constructor(private rdsState: RdsState, private updateResourceNames: () => void) {}
@ -55,13 +56,14 @@ export class LdsState implements XdsStreamState<Listener__Output> {
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.name === targetName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing RDS update for new watcher for targetName ' + targetName);
watcher.onValidUpdate(message);
watcher.onValidUpdate(message, isV2);
});
}
}
@ -93,7 +95,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
return Array.from(this.watchers.keys());
}
private validateResponse(message: Listener__Output): boolean {
private validateResponse(message: Listener__Output, isV2: boolean): boolean {
if (
!(
message.api_listener?.api_listener &&
@ -104,7 +106,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
return false;
}
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener.value);
if (EXPERIMENTAL_FAULT_INJECTION) {
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
const filterNames = new Set<string>();
for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) {
if (filterNames.has(httpFilter.name)) {
@ -136,7 +138,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
case 'rds':
return !!httpConnectionManager.rds?.config_source?.ads;
case 'route_config':
return this.rdsState.validateResponse(httpConnectionManager.route_config!);
return this.rdsState.validateResponse(httpConnectionManager.route_config!, isV2);
}
return false;
}
@ -151,20 +153,21 @@ export class LdsState implements XdsStreamState<Listener__Output> {
}
}
handleResponses(responses: Listener__Output[]): string | null {
handleResponses(responses: Listener__Output[], isV2: boolean): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
if (!this.validateResponse(message, isV2)) {
trace('LDS validation failed for message ' + JSON.stringify(message));
return 'LDS Error: Route validation failed';
}
}
this.latestResponses = responses;
this.latestIsV2 = isV2;
const allTargetNames = new Set<string>();
for (const message of responses) {
allTargetNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
watcher.onValidUpdate(message, isV2);
}
}
trace('Received RDS response with route config names ' + Array.from(allTargetNames));

View File

@ -45,6 +45,7 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
private watchers: Map<string, Watcher<RouteConfiguration__Output>[]> = new Map<string, Watcher<RouteConfiguration__Output>[]>();
private latestResponses: RouteConfiguration__Output[] = [];
private latestIsV2 = false;
constructor(private updateResourceNames: () => void) {}
@ -61,13 +62,14 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.name === routeConfigName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing RDS update for new watcher for routeConfigName ' + routeConfigName);
watcher.onValidUpdate(message);
watcher.onValidUpdate(message, isV2);
});
}
}
@ -99,7 +101,7 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
return Array.from(this.watchers.keys());
}
validateResponse(message: RouteConfiguration__Output): boolean {
validateResponse(message: RouteConfiguration__Output, isV2: boolean): boolean {
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
for (const virtualHost of message.virtual_hosts) {
for (const domainPattern of virtualHost.domains) {
@ -114,7 +116,7 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
return false;
}
}
if (EXPERIMENTAL_FAULT_INJECTION) {
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
for (const filterConfig of Object.values(virtualHost.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return false;
@ -140,7 +142,7 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
if ((route.route === undefined) || (route.route === null) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) {
return false;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return false;
@ -155,7 +157,7 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) {
return false;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
if (!isV2 && EXPERIMENTAL_FAULT_INJECTION) {
for (const weightedCluster of route.route!.weighted_clusters!.clusters) {
for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
@ -180,20 +182,21 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
}
}
handleResponses(responses: RouteConfiguration__Output[]): string | null {
handleResponses(responses: RouteConfiguration__Output[], isV2: boolean): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
if (!this.validateResponse(message, isV2)) {
trace('RDS validation failed for message ' + JSON.stringify(message));
return 'RDS Error: Route validation failed';
}
}
this.latestResponses = responses;
this.latestIsV2 = isV2;
const allRouteConfigNames = new Set<string>();
for (const message of responses) {
allRouteConfigNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
watcher.onValidUpdate(message, isV2);
}
}
trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames));

View File

@ -18,7 +18,12 @@
import { StatusObject } from "@grpc/grpc-js";
export interface Watcher<UpdateType> {
onValidUpdate(update: UpdateType): void;
/* Including the isV2 flag here is a bit of a kludge. It would probably be
* better for XdsStreamState#handleResponses to transform the protobuf
* message type into a library-specific configuration object type, to
* remove a lot of duplicate logic, including logic for handling that
* flag. */
onValidUpdate(update: UpdateType, isV2: boolean): void;
onTransientError(error: StatusObject): void;
onResourceDoesNotExist(): void;
}
@ -32,7 +37,7 @@ export interface XdsStreamState<ResponseType> {
* or null if it should be acked.
* @param responses
*/
handleResponses(responses: ResponseType[]): string | null;
handleResponses(responses: ResponseType[], isV2: boolean): string | null;
reportStreamError(status: StatusObject): void;
}