From 8a2c5af8f70754b87f70eb5fcb07b44891a2a1f8 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 8 Jul 2020 14:48:54 -0700 Subject: [PATCH] Finish up bootstrap and EDS client code --- packages/grpc-js/src/xds-bootstrap.ts | 113 +++++++++++++++++++++++++- packages/grpc-js/src/xds-client.ts | 107 +++++++++++++++++------- 2 files changed, 190 insertions(+), 30 deletions(-) diff --git a/packages/grpc-js/src/xds-bootstrap.ts b/packages/grpc-js/src/xds-bootstrap.ts index 95fd96e3..c8e88a01 100644 --- a/packages/grpc-js/src/xds-bootstrap.ts +++ b/packages/grpc-js/src/xds-bootstrap.ts @@ -75,8 +75,117 @@ function validateXdsServerConfig(obj: any): XdsServerConfig { }; } +function validateValue(obj: any): adsTypes.messages.google.protobuf.Value { + if (Array.isArray(obj)) { + return { + kind: 'listValue', + listValue: { + values: obj.map(value => validateValue(value)) + } + } + } else { + switch (typeof obj) { + case 'boolean': + return { + kind: 'boolValue', + boolValue: obj + }; + case 'number': + return { + kind: 'numberValue', + numberValue: obj + }; + case 'string': + return { + kind: 'stringValue', + stringValue: obj + }; + case 'object': + if (obj === null) { + return { + kind: 'nullValue', + nullValue: 'NULL_VALUE' + }; + } else { + return { + kind: 'structValue', + structValue: getStructFromJson(obj) + }; + } + default: + throw new Error(`Could not handle struct value of type ${typeof obj}`); + } + } +} + +function getStructFromJson(obj: any): adsTypes.messages.google.protobuf.Struct { + if (typeof obj !== 'object' || obj === null) { + throw new Error('Invalid JSON object for Struct field'); + } + const result = Object.keys(obj).map(key => validateValue(key)); + if (result.length === 1) { + return { + fields: result[0] + } + } else { + return { + fields: { + kind: 'listValue', + listValue: { + values: result + } + } + } + }; +} + +/** + * Validate that the input obj is a valid Node proto message. Only checks the + * fields we expect to see: id, cluster, locality, and metadata. + * @param obj + */ function validateNode(obj: any): adsTypes.messages.envoy.api.v2.core.Node { - throw new Error('Not implemented'); + const result: adsTypes.messages.envoy.api.v2.core.Node = {}; + if (!('id' in obj)) { + throw new Error('id field missing in node element'); + } + if (typeof obj.id !== 'string') { + throw new Error(`node.id field: expected string, got ${typeof obj.id}`); + } + result.id = obj.id; + if (!('cluster' in obj)) { + throw new Error('cluster field missing in node element'); + } + if (typeof obj.cluster !== 'string') { + throw new Error(`node.cluster field: expected string, got ${typeof obj.cluster}`); + } + result.cluster = obj.cluster; + if (!('locality' in obj)) { + throw new Error('locality field missing in node element'); + } + result.locality = {}; + if ('region' in obj.locality) { + if (typeof obj.locality.region !== 'string') { + throw new Error(`node.locality.region field: expected string, got ${typeof obj.locality.region}`); + } + result.locality.region = obj.locality.region; + } + if ('zone' in obj.locality) { + if (typeof obj.locality.region !== 'string') { + throw new Error(`node.locality.zone field: expected string, got ${typeof obj.locality.zone}`); + } + result.locality.zone = obj.locality.zone; + } + if ('sub_zone' in obj.locality) { + if (typeof obj.locality.sub_zone !== 'string') { + throw new Error(`node.locality.sub_zone field: expected string, got ${typeof obj.locality.sub_zone}`); + } + result.locality.sub_zone = obj.locality.sub_zone; + } + if ('metadata' in obj) { + result.metadata = getStructFromJson(obj.metadata); + } + return result; } function validateBootstrapFile(obj: any): BootstrapInfo { @@ -94,7 +203,7 @@ export async function loadBootstrapInfo(): Promise { } const bootstrapPath = process.env.GRPC_XDS_BOOTSTRAP; if (bootstrapPath === undefined) { - return Promise.reject(new Error('GRPC_XDS_BOOTSTRAP environment variable needs to be set to the path to the bootstrap file to use xDS')); + return Promise.reject(new Error('The GRPC_XDS_BOOTSTRAP environment variable needs to be set to the path to the bootstrap file to use xDS')); } loadedBootstrapInfo = new Promise((resolve, reject) => { fs.readFile(bootstrapPath, { encoding: 'utf8'}, (err, data) => { diff --git a/packages/grpc-js/src/xds-client.ts b/packages/grpc-js/src/xds-client.ts index 6d80f3b7..05076214 100644 --- a/packages/grpc-js/src/xds-client.ts +++ b/packages/grpc-js/src/xds-client.ts @@ -15,18 +15,26 @@ * */ -import * as fs from 'fs'; import * as protoLoader from '@grpc/proto-loader'; import { loadPackageDefinition } from './make-client'; import * as adsTypes from './generated/ads'; import * as edsTypes from './generated/endpoint'; -import { ChannelCredentials, createGoogleDefaultCredentials } from './channel-credentials'; +import { createGoogleDefaultCredentials } from './channel-credentials'; import { loadBootstrapInfo } from './xds-bootstrap'; import { ClientDuplexStream, ServiceError } from './call'; import { StatusObject } from './call-stream'; import { isIPv4, isIPv6 } from 'net'; -import { Status } from './constants'; +import { Status, LogVerbosity } from './constants'; import { Metadata } from './metadata'; +import * as logging from './logging'; +import { ServiceConfig } from './service-config'; +import { ChannelOptions } from './channel-options'; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} const clientVersion = require('../../package.json').version; @@ -72,36 +80,57 @@ export class XdsClient { private client: adsTypes.ClientInterfaces.envoy.service.discovery.v2.AggregatedDiscoveryServiceClient | null = null; private adsCall: ClientDuplexStream | null = null; + private hasShutdown: boolean = false; + private endpointWatchers: Map[]> = new Map[]>(); private lastEdsVersionInfo: string = ''; private lastEdsNonce: string = ''; - constructor() { + constructor(private targetName: string, private serviceConfigWatcher: Watcher, channelOptions: ChannelOptions) { + const channelArgs = {...channelOptions}; + const channelArgsToRemove = [ + /* The SSL target name override corresponds to the target, and this + * client has its own target */ + 'grpc.ssl_target_name_override', + /* The default authority also corresponds to the target */ + 'grpc.default_authority', + /* This client will have its own specific keepalive time setting */ + 'grpc.keepalive_time_ms', + /* The service config specifies the load balancing policy. This channel + * needs its own separate load balancing policy setting. In particular, + * recursively using an xDS load balancer for the xDS client would be + * bad */ + 'grpc.service_config' + ]; + for (const arg of channelArgsToRemove) { + delete channelArgs[arg]; + } + channelArgs['grpc.keepalive_time_ms'] = 5000; Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(([bootstrapInfo, protoDefinitions]) => { + if (this.hasShutdown) { + return; + } this.node = { ...bootstrapInfo.node, build_version: `gRPC Node Pure JS ${clientVersion}`, user_agent_name: 'gRPC Node Pure JS' } - this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials()); + this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials(), channelArgs); this.maybeStartAdsStream(); }, (error) => { + trace('Failed to initialize xDS Client. ' + error.message); // Bubble this error up to any listeners - for (const watcherList of this.endpointWatchers.values()) { - for (const watcher of watcherList) { - watcher.onTransientError({ - code: Status.INTERNAL, - details: `Failed to initialize xDS Client. ${error.message}`, - metadata: new Metadata() - }) - } - } + this.reportStreamError({ + code: Status.INTERNAL, + details: `Failed to initialize xDS Client. ${error.message}`, + metadata: new Metadata() + }); }); } /** * Start the ADS stream if the client exists and there is not already an - * existing stream. + * existing stream, and there */ private maybeStartAdsStream() { if (this.client === null) { @@ -110,6 +139,9 @@ export class XdsClient { if (this.adsCall !== null) { return; } + if (this.hasShutdown) { + return; + } this.adsCall = this.client.StreamAggregatedResources(); this.adsCall.on('data', (message: adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output) => { switch (message.type_url) { @@ -140,14 +172,18 @@ export class XdsClient { } }); this.adsCall.on('error', (error: ServiceError) => { + trace('ADS stream ended. code=' + error.code + ' details= ' + error.details); this.adsCall = null; this.reportStreamError(error); + /* Connection backoff is handled by the client object, so we can + * immediately start a new request to indicate that it should try to + * reconnect */ this.maybeStartAdsStream(); }); const endpointWatcherNames = Array.from(this.endpointWatchers.keys()); if (endpointWatcherNames.length > 0) { this.adsCall.write({ - node: this.node, + node: this.node!, type_url: EDS_TYPE_URL, resource_names: endpointWatcherNames }); @@ -159,7 +195,7 @@ export class XdsClient { return; } this.adsCall.write({ - node: this.node, + node: this.node!, type_url: typeUrl, version_info: versionInfo, response_nonce: nonce, @@ -178,7 +214,7 @@ export class XdsClient { return; } this.adsCall.write({ - node: this.node, + node: this.node!, type_url: EDS_TYPE_URL, resource_names: Array.from(this.endpointWatchers.keys()), response_nonce: this.lastEdsNonce, @@ -195,7 +231,7 @@ export class XdsClient { return; } this.adsCall.write({ - node: this.node, + node: this.node!, type_url: EDS_TYPE_URL, resource_names: Array.from(this.endpointWatchers.keys()), response_nonce: this.lastEdsNonce, @@ -206,19 +242,18 @@ export class XdsClient { }); } + /** + * Validate the ClusterLoadAssignment object by these rules: + * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto + * @param message + */ private validateEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output): boolean { for (const endpoint of message.endpoints) { for (const lb of endpoint.lb_endpoints) { - if (!lb.endpoint) { + const socketAddress = lb.endpoint?.address?.socket_address; + if (!socketAddress) { return false; } - if (!lb.endpoint.address) { - return false; - } - if (!lb.endpoint.address.socket_address) { - return false; - } - const socketAddress = lb.endpoint.address.socket_address; if (socketAddress.port_specifier !== 'port_value') { return false; } @@ -240,7 +275,7 @@ export class XdsClient { private updateEdsNames() { if (this.adsCall) { this.adsCall.write({ - node: this.node, + node: this.node!, type_url: EDS_TYPE_URL, resource_names: Array.from(this.endpointWatchers.keys()), response_nonce: this.lastEdsNonce, @@ -259,6 +294,7 @@ export class XdsClient { } addEndpointWatcher(edsServiceName: string, watcher: Watcher) { + trace('Watcher added for endpoint ' + edsServiceName); let watchersEntry = this.endpointWatchers.get(edsServiceName); let addedServiceName = false; if (watchersEntry === undefined) { @@ -273,12 +309,27 @@ export class XdsClient { } removeEndpointWatcher(edsServiceName: string, watcher: Watcher) { + trace('Watcher removed for endpoint ' + edsServiceName); const watchersEntry = this.endpointWatchers.get(edsServiceName); + let removedServiceName = false; if (watchersEntry !== undefined) { const entryIndex = watchersEntry.indexOf(watcher); if (entryIndex >= 0) { watchersEntry.splice(entryIndex, 1); } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.endpointWatchers.delete(edsServiceName); + } + } + if (removedServiceName) { + this.updateEdsNames(); } } + + shutdown(): void { + this.adsCall?.cancel(); + this.client?.close(); + this.hasShutdown = true; + } } \ No newline at end of file