From c953a0e212b16e3d269ce3a661799caaf274039e Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 12 Feb 2021 13:37:52 -0800 Subject: [PATCH] refactor part of xds-client into seprate files --- packages/grpc-js-xds/src/load-balancer-cds.ts | 3 +- .../src/xds-stream-state/cds-state.ts | 171 +++++++++++++++++ .../src/xds-stream-state/eds-state.ts | 174 ++++++++++++++++++ .../src/xds-stream-state/lds-state.ts | 105 +++++++++++ .../src/xds-stream-state/rds-state.ts | 94 ++++++++++ .../src/xds-stream-state/xds-stream-state.ts | 38 ++++ 6 files changed, 584 insertions(+), 1 deletion(-) create mode 100644 packages/grpc-js-xds/src/xds-stream-state/cds-state.ts create mode 100644 packages/grpc-js-xds/src/xds-stream-state/eds-state.ts create mode 100644 packages/grpc-js-xds/src/xds-stream-state/lds-state.ts create mode 100644 packages/grpc-js-xds/src/xds-stream-state/rds-state.ts create mode 100644 packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts diff --git a/packages/grpc-js-xds/src/load-balancer-cds.ts b/packages/grpc-js-xds/src/load-balancer-cds.ts index a2961927..452b1304 100644 --- a/packages/grpc-js-xds/src/load-balancer-cds.ts +++ b/packages/grpc-js-xds/src/load-balancer-cds.ts @@ -16,7 +16,7 @@ */ import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js'; -import { XdsClient, Watcher } from './xds-client'; +import { XdsClient } from './xds-client'; import { Cluster__Output } from './generated/envoy/api/v2/Cluster'; import SubchannelAddress = experimental.SubchannelAddress; import UnavailablePicker = experimental.UnavailablePicker; @@ -26,6 +26,7 @@ import ChannelControlHelper = experimental.ChannelControlHelper; import registerLoadBalancerType = experimental.registerLoadBalancerType; import LoadBalancingConfig = experimental.LoadBalancingConfig; import { EdsLoadBalancingConfig } from './load-balancer-eds'; +import { Watcher } from './xds-stream-state/xds-stream-state'; const TRACER_NAME = 'cds_balancer'; diff --git a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts new file mode 100644 index 00000000..34308995 --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts @@ -0,0 +1,171 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { Cluster__Output } from "../generated/envoy/api/v2/Cluster"; +import { EdsState } from "./eds-state"; +import { Watcher, XdsStreamState } from "./xds-stream-state"; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +export class CdsState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + private watchers: Map[]> = new Map< + string, + Watcher[] + >(); + + private latestResponses: Cluster__Output[] = []; + + constructor( + private edsState: EdsState, + private updateResourceNames: () => void + ) {} + + /** + * Add the watcher to the watcher list. Returns true if the list of resource + * names has changed, and false otherwise. + * @param clusterName + * @param watcher + */ + addWatcher(clusterName: string, watcher: Watcher): void { + trace('Adding CDS watcher for clusterName ' + clusterName); + let watchersEntry = this.watchers.get(clusterName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.watchers.set(clusterName, watchersEntry); + } + watchersEntry.push(watcher); + + /* If we have already received an update for the requested edsServiceName, + * immediately pass that update along to the watcher */ + for (const message of this.latestResponses) { + if (message.name === clusterName) { + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName); + watcher.onValidUpdate(message); + }); + } + } + if (addedServiceName) { + this.updateResourceNames(); + } + } + + removeWatcher(clusterName: string, watcher: Watcher): void { + trace('Removing CDS watcher for clusterName ' + clusterName); + const watchersEntry = this.watchers.get(clusterName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.watchers.delete(clusterName); + } + } + if (removedServiceName) { + this.updateResourceNames(); + } + } + + getResourceNames(): string[] { + return Array.from(this.watchers.keys()); + } + + private validateResponse(message: Cluster__Output): boolean { + if (message.type !== 'EDS') { + return false; + } + if (!message.eds_cluster_config?.eds_config?.ads) { + return false; + } + if (message.lb_policy !== 'ROUND_ROBIN') { + return false; + } + if (message.lrs_server) { + if (!message.lrs_server.self) { + return false; + } + } + return true; + } + + /** + * Given a list of clusterNames (which may actually be the cluster name), + * for each watcher watching a name not on the list, call that watcher's + * onResourceDoesNotExist method. + * @param allClusterNames + */ + private handleMissingNames(allClusterNames: Set) { + for (const [clusterName, watcherList] of this.watchers.entries()) { + if (!allClusterNames.has(clusterName)) { + trace('Reporting CDS resource does not exist for clusterName ' + clusterName); + for (const watcher of watcherList) { + watcher.onResourceDoesNotExist(); + } + } + } + } + + handleResponses(responses: Cluster__Output[]): string | null { + for (const message of responses) { + if (!this.validateResponse(message)) { + trace('CDS validation failed for message ' + JSON.stringify(message)); + return 'CDS Error: Cluster validation failed'; + } + } + this.latestResponses = responses; + const allEdsServiceNames: Set = new Set(); + const allClusterNames: Set = new Set(); + for (const message of responses) { + allClusterNames.add(message.name); + const edsServiceName = message.eds_cluster_config?.service_name ?? ''; + allEdsServiceNames.add( + edsServiceName === '' ? message.name : edsServiceName + ); + const watchers = this.watchers.get(message.name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + trace('Received CDS updates for cluster names ' + Array.from(allClusterNames)); + this.handleMissingNames(allClusterNames); + this.edsState.handleMissingNames(allEdsServiceNames); + return null; + } + + reportStreamError(status: StatusObject): void { + for (const watcherList of this.watchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + } +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts new file mode 100644 index 00000000..c9beef29 --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -0,0 +1,174 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { isIPv4, isIPv6 } from "net"; +import { ClusterLoadAssignment__Output } from "../generated/envoy/api/v2/ClusterLoadAssignment"; +import { Watcher, XdsStreamState } from "./xds-stream-state"; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +export class EdsState implements XdsStreamState { + public versionInfo = ''; + public nonce = ''; + + private watchers: Map< + string, + Watcher[] + > = new Map[]>(); + + private latestResponses: ClusterLoadAssignment__Output[] = []; + + constructor(private updateResourceNames: () => void) {} + + /** + * Add the watcher to the watcher list. Returns true if the list of resource + * names has changed, and false otherwise. + * @param edsServiceName + * @param watcher + */ + addWatcher( + edsServiceName: string, + watcher: Watcher + ): void { + let watchersEntry = this.watchers.get(edsServiceName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.watchers.set(edsServiceName, watchersEntry); + } + trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName); + watchersEntry.push(watcher); + + /* If we have already received an update for the requested edsServiceName, + * immediately pass that update along to the watcher */ + for (const message of this.latestResponses) { + if (message.cluster_name === edsServiceName) { + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName); + watcher.onValidUpdate(message); + }); + } + } + if (addedServiceName) { + this.updateResourceNames(); + } + } + + removeWatcher( + edsServiceName: string, + watcher: Watcher + ): void { + trace('Removing EDS watcher for edsServiceName ' + edsServiceName); + const watchersEntry = this.watchers.get(edsServiceName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName); + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.watchers.delete(edsServiceName); + } + } + if (removedServiceName) { + this.updateResourceNames(); + } + } + + getResourceNames(): string[] { + return Array.from(this.watchers.keys()); + } + + /** + * Validate the ClusterLoadAssignment object by these rules: + * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto + * @param message + */ + private validateResponse(message: ClusterLoadAssignment__Output) { + for (const endpoint of message.endpoints) { + for (const lb of endpoint.lb_endpoints) { + const socketAddress = lb.endpoint?.address?.socket_address; + if (!socketAddress) { + return false; + } + if (socketAddress.port_specifier !== 'port_value') { + return false; + } + if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { + return false; + } + } + } + return true; + } + + /** + * Given a list of edsServiceNames (which may actually be the cluster name), + * for each watcher watching a name not on the list, call that watcher's + * onResourceDoesNotExist method. + * @param allClusterNames + */ + handleMissingNames(allEdsServiceNames: Set) { + for (const [edsServiceName, watcherList] of this.watchers.entries()) { + if (!allEdsServiceNames.has(edsServiceName)) { + trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName); + for (const watcher of watcherList) { + watcher.onResourceDoesNotExist(); + } + } + } + } + + handleResponses(responses: ClusterLoadAssignment__Output[]) { + for (const message of responses) { + if (!this.validateResponse(message)) { + trace('EDS validation failed for message ' + JSON.stringify(message)); + return 'EDS Error: ClusterLoadAssignment validation failed'; + } + } + this.latestResponses = responses; + const allClusterNames: Set = new Set(); + for (const message of responses) { + allClusterNames.add(message.cluster_name); + const watchers = this.watchers.get(message.cluster_name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + trace('Received EDS updates for cluster names ' + Array.from(allClusterNames)); + this.handleMissingNames(allClusterNames); + return null; + } + + reportStreamError(status: StatusObject): void { + for (const watcherList of this.watchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + } +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts new file mode 100644 index 00000000..c5db3bfa --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -0,0 +1,105 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as protoLoader from '@grpc/proto-loader'; +import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { Listener__Output } from "../generated/envoy/api/v2/Listener"; +import { RdsState } from "./rds-state"; +import { XdsStreamState } from "./xds-stream-state"; +import { HttpConnectionManager__Output } from '../generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager'; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +const HTTP_CONNECTION_MANGER_TYPE_URL = + 'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager'; + +export class LdsState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + constructor(private targetName: string, private rdsState: RdsState) {} + + getResourceNames(): string[] { + return [this.targetName]; + } + + private validateResponse(message: Listener__Output): boolean { + if ( + !( + message.api_listener?.api_listener && + protoLoader.isAnyExtension(message.api_listener.api_listener) && + message.api_listener?.api_listener['@type'] === + HTTP_CONNECTION_MANGER_TYPE_URL + ) + ) { + return false; + } + const httpConnectionManager = message.api_listener + ?.api_listener as protoLoader.AnyExtension & + HttpConnectionManager__Output; + switch (httpConnectionManager.route_specifier) { + case 'rds': + return !!httpConnectionManager.rds?.config_source?.ads; + case 'route_config': + return true; + } + return false; + } + + handleResponses(responses: Listener__Output[]): string | null { + trace('Received LDS update with names ' + responses.map(message => message.name)); + for (const message of responses) { + if (message.name === this.targetName) { + if (this.validateResponse(message)) { + // The validation step ensures that this is correct + const httpConnectionManager = message.api_listener! + .api_listener as protoLoader.AnyExtension & + HttpConnectionManager__Output; + switch (httpConnectionManager.route_specifier) { + case 'rds': + trace('Received LDS update with RDS route config name ' + httpConnectionManager.rds!.route_config_name); + this.rdsState.setRouteConfigName( + httpConnectionManager.rds!.route_config_name + ); + break; + case 'route_config': + trace('Received LDS update with route configuration'); + this.rdsState.setRouteConfigName(null); + this.rdsState.handleSingleMessage( + httpConnectionManager.route_config! + ); + break; + default: + // The validation rules should prevent this + } + } else { + trace('LRS validation error for message ' + JSON.stringify(message)); + return 'LRS Error: Listener validation failed'; + } + } + } + return null; + } + + reportStreamError(status: StatusObject): void { + // Nothing to do here + } +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts new file mode 100644 index 00000000..18268587 --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -0,0 +1,94 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { RouteConfiguration__Output } from "../generated/envoy/api/v2/RouteConfiguration"; +import { CdsLoadBalancingConfig } from "../load-balancer-cds"; +import { Watcher, XdsStreamState } from "./xds-stream-state"; +import ServiceConfig = experimental.ServiceConfig; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +export class RdsState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + private routeConfigName: string | null = null; + + constructor( + private targetName: string, + private watcher: Watcher, + private updateResouceNames: () => void + ) {} + + getResourceNames(): string[] { + return this.routeConfigName ? [this.routeConfigName] : []; + } + + handleSingleMessage(message: RouteConfiguration__Output) { + for (const virtualHost of message.virtual_hosts) { + if (virtualHost.domains.indexOf(this.targetName) >= 0) { + const route = virtualHost.routes[virtualHost.routes.length - 1]; + if (route.match?.prefix === '' && route.route?.cluster) { + trace('Reporting RDS update for host ' + this.targetName + ' with cluster ' + route.route.cluster); + this.watcher.onValidUpdate({ + methodConfig: [], + loadBalancingConfig: [ + new CdsLoadBalancingConfig(route.route.cluster) + ], + }); + return; + } else { + trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster); + } + } + } + trace('Reporting RDS resource does not exist from domain lists ' + message.virtual_hosts.map(virtualHost => virtualHost.domains)); + /* If none of the routes match the one we are looking for, bubble up an + * error. */ + this.watcher.onResourceDoesNotExist(); + } + + handleResponses(responses: RouteConfiguration__Output[]): string | null { + trace('Received RDS response with route config names ' + responses.map(message => message.name)); + if (this.routeConfigName !== null) { + for (const message of responses) { + if (message.name === this.routeConfigName) { + this.handleSingleMessage(message); + return null; + } + } + } + return null; + } + + setRouteConfigName(name: string | null) { + const oldName = this.routeConfigName; + this.routeConfigName = name; + if (name !== oldName) { + this.updateResouceNames(); + } + } + + reportStreamError(status: StatusObject): void { + this.watcher.onTransientError(status); + } +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts new file mode 100644 index 00000000..83db1781 --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -0,0 +1,38 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { StatusObject } from "@grpc/grpc-js"; + +export interface Watcher { + onValidUpdate(update: UpdateType): void; + onTransientError(error: StatusObject): void; + onResourceDoesNotExist(): void; +} + +export interface XdsStreamState { + versionInfo: string; + nonce: string; + getResourceNames(): string[]; + /** + * Returns a string containing the error details if the message should be nacked, + * or null if it should be acked. + * @param responses + */ + handleResponses(responses: ResponseType[]): string | null; + + reportStreamError(status: StatusObject): void; +} \ No newline at end of file