From 5767f7d10773204cb705d8f8cd2d76813d3f53c9 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 7 Jul 2020 09:26:52 -0700 Subject: [PATCH] Complete most of the endpoint watcher implementation --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/xds-client.ts | 243 ++++++++++++++++++++++++++++- 2 files changed, 240 insertions(+), 5 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 2e5836f5..28095024 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -15,7 +15,7 @@ "types": "build/src/index.d.ts", "license": "Apache-2.0", "devDependencies": { - "@grpc/proto-loader": "^0.6.0-pre3", + "@grpc/proto-loader": "^0.6.0-pre5", "@types/gulp": "^4.0.6", "@types/gulp-mocha": "0.0.32", "@types/lodash": "^4.14.108", diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index e8eea6f3..6d80f3b7 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -19,9 +19,26 @@ import * as fs from 'fs'; import * as protoLoader from '@grpc/proto-loader'; import { loadPackageDefinition } from './make-client'; import * as adsTypes from './generated/ads'; -import { ChannelCredentials } from './channel-credentials'; +import * as edsTypes from './generated/endpoint'; +import { ChannelCredentials, createGoogleDefaultCredentials } from './channel-credentials'; +import { loadBootstrapInfo } from './xds-bootstrap'; +import { ClientDuplexStream, ServiceError } from './call'; +import { StatusObject } from './call-stream'; +import { isIPv4, isIPv6 } from 'net'; +import { Status } from './constants'; +import { Metadata } from './metadata'; -const packageDefinition = protoLoader.loadSync([ +const clientVersion = require('../../package.json').version; + +const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment'; + +let loadedProtos: Promise | null = null; + +function loadAdsProtos(): Promise { + if (loadedProtos !== null) { + return loadedProtos; + } + loadedProtos = protoLoader.load([ 'envoy/service/discovery/v2/ads.proto', 'envoy/api/v2/listener.proto', 'envoy/api/v2/route.proto', @@ -40,10 +57,228 @@ const packageDefinition = protoLoader.loadSync([ 'deps/googleapis/', 'deps/protoc-gen-validate/' ] - }); + }).then(packageDefinition => loadPackageDefinition(packageDefinition) as unknown as adsTypes.ProtoGrpcType); + return loadedProtos; +} -const loadedDefinition = loadPackageDefinition(packageDefinition) as unknown as adsTypes.ProtoGrpcType; +export interface Watcher { + onValidUpdate(update: UpdateType): void; + onTransientError(error: StatusObject): void; + onResourceDoesNotExist(): void; +} export class XdsClient { + private node: adsTypes.messages.envoy.api.v2.core.Node | null = null; + private client: adsTypes.ClientInterfaces.envoy.service.discovery.v2.AggregatedDiscoveryServiceClient | null = null; + private adsCall: ClientDuplexStream | null = null; + private endpointWatchers: Map[]> = new Map[]>(); + private lastEdsVersionInfo: string = ''; + private lastEdsNonce: string = ''; + + constructor() { + Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(([bootstrapInfo, protoDefinitions]) => { + this.node = { + ...bootstrapInfo.node, + build_version: `gRPC Node Pure JS ${clientVersion}`, + user_agent_name: 'gRPC Node Pure JS' + } + this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials()); + this.maybeStartAdsStream(); + }, (error) => { + // Bubble this error up to any listeners + for (const watcherList of this.endpointWatchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError({ + code: Status.INTERNAL, + details: `Failed to initialize xDS Client. ${error.message}`, + metadata: new Metadata() + }) + } + } + }); + } + + /** + * Start the ADS stream if the client exists and there is not already an + * existing stream. + */ + private maybeStartAdsStream() { + if (this.client === null) { + return; + } + if (this.adsCall !== null) { + return; + } + this.adsCall = this.client.StreamAggregatedResources(); + this.adsCall.on('data', (message: adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output) => { + switch (message.type_url) { + case EDS_TYPE_URL: + const edsResponses: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output[] = []; + for (const resource of message.resources) { + if (protoLoader.isAnyExtension(resource) && resource['@type'] === EDS_TYPE_URL) { + const resp = resource as protoLoader.AnyExtension & edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output; + if (!this.validateEdsResponse(resp)) { + this.nackEds('ClusterLoadAssignment validation failed'); + return; + } + edsResponses.push(resp); + } else { + this.nackEds(`Invalid resource type ${protoLoader.isAnyExtension(resource) ? resource['@type'] : resource.type_url}`); + return; + } + } + for (const message of edsResponses) { + this.handleEdsResponse(message); + } + this.lastEdsVersionInfo = message.version_info; + this.lastEdsNonce = message.nonce; + this.ackEds(); + break; + default: + this.nackUnknown(message.type_url, message.version_info, message.nonce); + } + }); + this.adsCall.on('error', (error: ServiceError) => { + this.adsCall = null; + this.reportStreamError(error); + this.maybeStartAdsStream(); + }); + const endpointWatcherNames = Array.from(this.endpointWatchers.keys()); + if (endpointWatcherNames.length > 0) { + this.adsCall.write({ + node: this.node, + type_url: EDS_TYPE_URL, + resource_names: endpointWatcherNames + }); + } + } + + private nackUnknown(typeUrl: string, versionInfo: string, nonce: string) { + if (!this.adsCall) { + return; + } + this.adsCall.write({ + node: this.node, + type_url: typeUrl, + version_info: versionInfo, + response_nonce: nonce, + error_detail: { + message: `Unknown type_url ${typeUrl}` + } + }); + } + + /** + * Acknowledge an EDS update. This should be called after the local nonce and + * version info are updated so that it sends the post-update values. + */ + private ackEds() { + if (!this.adsCall) { + return; + } + this.adsCall.write({ + node: this.node, + type_url: EDS_TYPE_URL, + resource_names: Array.from(this.endpointWatchers.keys()), + response_nonce: this.lastEdsNonce, + version_info: this.lastEdsVersionInfo + }); + } + + /** + * Reject an EDS update. This should be called without updating the local + * nonce and version info. + */ + private nackEds(message: string) { + if (!this.adsCall) { + return; + } + this.adsCall.write({ + node: this.node, + type_url: EDS_TYPE_URL, + resource_names: Array.from(this.endpointWatchers.keys()), + response_nonce: this.lastEdsNonce, + version_info: this.lastEdsVersionInfo, + error_detail: { + message + } + }); + } + + private validateEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output): boolean { + for (const endpoint of message.endpoints) { + for (const lb of endpoint.lb_endpoints) { + if (!lb.endpoint) { + return false; + } + if (!lb.endpoint.address) { + return false; + } + if (!lb.endpoint.address.socket_address) { + return false; + } + const socketAddress = lb.endpoint.address.socket_address; + if (socketAddress.port_specifier !== 'port_value') { + return false; + } + if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { + return false; + } + } + } + return true; + } + + private handleEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output) { + const watchers = this.endpointWatchers.get(message.cluster_name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + + private updateEdsNames() { + if (this.adsCall) { + this.adsCall.write({ + node: this.node, + type_url: EDS_TYPE_URL, + resource_names: Array.from(this.endpointWatchers.keys()), + response_nonce: this.lastEdsNonce, + version_info: this.lastEdsVersionInfo + }); + } + } + + private reportStreamError(status: StatusObject) { + for (const watcherList of this.endpointWatchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + // Also do the same for other types of watchers when those are implemented + } + + addEndpointWatcher(edsServiceName: string, watcher: Watcher) { + let watchersEntry = this.endpointWatchers.get(edsServiceName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.endpointWatchers.set(edsServiceName, watchersEntry); + } + watchersEntry.push(watcher); + if (addedServiceName) { + this.updateEdsNames(); + } + } + + removeEndpointWatcher(edsServiceName: string, watcher: Watcher) { + const watchersEntry = this.endpointWatchers.get(edsServiceName); + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + watchersEntry.splice(entryIndex, 1); + } + } + } } \ No newline at end of file