Finish up bootstrap and EDS client code

This commit is contained in:
Michael Lumish 2020-07-08 14:48:54 -07:00
parent 5767f7d107
commit 8a2c5af8f7
2 changed files with 190 additions and 30 deletions

View File

@ -75,8 +75,117 @@ function validateXdsServerConfig(obj: any): XdsServerConfig {
};
}
function validateValue(obj: any): adsTypes.messages.google.protobuf.Value {
if (Array.isArray(obj)) {
return {
kind: 'listValue',
listValue: {
values: obj.map(value => validateValue(value))
}
}
} else {
switch (typeof obj) {
case 'boolean':
return {
kind: 'boolValue',
boolValue: obj
};
case 'number':
return {
kind: 'numberValue',
numberValue: obj
};
case 'string':
return {
kind: 'stringValue',
stringValue: obj
};
case 'object':
if (obj === null) {
return {
kind: 'nullValue',
nullValue: 'NULL_VALUE'
};
} else {
return {
kind: 'structValue',
structValue: getStructFromJson(obj)
};
}
default:
throw new Error(`Could not handle struct value of type ${typeof obj}`);
}
}
}
function getStructFromJson(obj: any): adsTypes.messages.google.protobuf.Struct {
if (typeof obj !== 'object' || obj === null) {
throw new Error('Invalid JSON object for Struct field');
}
const result = Object.keys(obj).map(key => validateValue(key));
if (result.length === 1) {
return {
fields: result[0]
}
} else {
return {
fields: {
kind: 'listValue',
listValue: {
values: result
}
}
}
};
}
/**
* Validate that the input obj is a valid Node proto message. Only checks the
* fields we expect to see: id, cluster, locality, and metadata.
* @param obj
*/
function validateNode(obj: any): adsTypes.messages.envoy.api.v2.core.Node {
throw new Error('Not implemented');
const result: adsTypes.messages.envoy.api.v2.core.Node = {};
if (!('id' in obj)) {
throw new Error('id field missing in node element');
}
if (typeof obj.id !== 'string') {
throw new Error(`node.id field: expected string, got ${typeof obj.id}`);
}
result.id = obj.id;
if (!('cluster' in obj)) {
throw new Error('cluster field missing in node element');
}
if (typeof obj.cluster !== 'string') {
throw new Error(`node.cluster field: expected string, got ${typeof obj.cluster}`);
}
result.cluster = obj.cluster;
if (!('locality' in obj)) {
throw new Error('locality field missing in node element');
}
result.locality = {};
if ('region' in obj.locality) {
if (typeof obj.locality.region !== 'string') {
throw new Error(`node.locality.region field: expected string, got ${typeof obj.locality.region}`);
}
result.locality.region = obj.locality.region;
}
if ('zone' in obj.locality) {
if (typeof obj.locality.region !== 'string') {
throw new Error(`node.locality.zone field: expected string, got ${typeof obj.locality.zone}`);
}
result.locality.zone = obj.locality.zone;
}
if ('sub_zone' in obj.locality) {
if (typeof obj.locality.sub_zone !== 'string') {
throw new Error(`node.locality.sub_zone field: expected string, got ${typeof obj.locality.sub_zone}`);
}
result.locality.sub_zone = obj.locality.sub_zone;
}
if ('metadata' in obj) {
result.metadata = getStructFromJson(obj.metadata);
}
return result;
}
function validateBootstrapFile(obj: any): BootstrapInfo {
@ -94,7 +203,7 @@ export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
}
const bootstrapPath = process.env.GRPC_XDS_BOOTSTRAP;
if (bootstrapPath === undefined) {
return Promise.reject(new Error('GRPC_XDS_BOOTSTRAP environment variable needs to be set to the path to the bootstrap file to use xDS'));
return Promise.reject(new Error('The GRPC_XDS_BOOTSTRAP environment variable needs to be set to the path to the bootstrap file to use xDS'));
}
loadedBootstrapInfo = new Promise((resolve, reject) => {
fs.readFile(bootstrapPath, { encoding: 'utf8'}, (err, data) => {

View File

@ -15,18 +15,26 @@
*
*/
import * as fs from 'fs';
import * as protoLoader from '@grpc/proto-loader';
import { loadPackageDefinition } from './make-client';
import * as adsTypes from './generated/ads';
import * as edsTypes from './generated/endpoint';
import { ChannelCredentials, createGoogleDefaultCredentials } from './channel-credentials';
import { createGoogleDefaultCredentials } from './channel-credentials';
import { loadBootstrapInfo } from './xds-bootstrap';
import { ClientDuplexStream, ServiceError } from './call';
import { StatusObject } from './call-stream';
import { isIPv4, isIPv6 } from 'net';
import { Status } from './constants';
import { Status, LogVerbosity } from './constants';
import { Metadata } from './metadata';
import * as logging from './logging';
import { ServiceConfig } from './service-config';
import { ChannelOptions } from './channel-options';
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const clientVersion = require('../../package.json').version;
@ -72,36 +80,57 @@ export class XdsClient {
private client: adsTypes.ClientInterfaces.envoy.service.discovery.v2.AggregatedDiscoveryServiceClient | null = null;
private adsCall: ClientDuplexStream<adsTypes.messages.envoy.api.v2.DiscoveryRequest, adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output> | null = null;
private hasShutdown: boolean = false;
private endpointWatchers: Map<string, Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>[]> = new Map<string, Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>[]>();
private lastEdsVersionInfo: string = '';
private lastEdsNonce: string = '';
constructor() {
constructor(private targetName: string, private serviceConfigWatcher: Watcher<ServiceConfig>, channelOptions: ChannelOptions) {
const channelArgs = {...channelOptions};
const channelArgsToRemove = [
/* The SSL target name override corresponds to the target, and this
* client has its own target */
'grpc.ssl_target_name_override',
/* The default authority also corresponds to the target */
'grpc.default_authority',
/* This client will have its own specific keepalive time setting */
'grpc.keepalive_time_ms',
/* The service config specifies the load balancing policy. This channel
* needs its own separate load balancing policy setting. In particular,
* recursively using an xDS load balancer for the xDS client would be
* bad */
'grpc.service_config'
];
for (const arg of channelArgsToRemove) {
delete channelArgs[arg];
}
channelArgs['grpc.keepalive_time_ms'] = 5000;
Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(([bootstrapInfo, protoDefinitions]) => {
if (this.hasShutdown) {
return;
}
this.node = {
...bootstrapInfo.node,
build_version: `gRPC Node Pure JS ${clientVersion}`,
user_agent_name: 'gRPC Node Pure JS'
}
this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials());
this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, createGoogleDefaultCredentials(), channelArgs);
this.maybeStartAdsStream();
}, (error) => {
trace('Failed to initialize xDS Client. ' + error.message);
// Bubble this error up to any listeners
for (const watcherList of this.endpointWatchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError({
code: Status.INTERNAL,
details: `Failed to initialize xDS Client. ${error.message}`,
metadata: new Metadata()
})
}
}
this.reportStreamError({
code: Status.INTERNAL,
details: `Failed to initialize xDS Client. ${error.message}`,
metadata: new Metadata()
});
});
}
/**
* Start the ADS stream if the client exists and there is not already an
* existing stream.
* existing stream, and there
*/
private maybeStartAdsStream() {
if (this.client === null) {
@ -110,6 +139,9 @@ export class XdsClient {
if (this.adsCall !== null) {
return;
}
if (this.hasShutdown) {
return;
}
this.adsCall = this.client.StreamAggregatedResources();
this.adsCall.on('data', (message: adsTypes.messages.envoy.api.v2.DiscoveryResponse__Output) => {
switch (message.type_url) {
@ -140,14 +172,18 @@ export class XdsClient {
}
});
this.adsCall.on('error', (error: ServiceError) => {
trace('ADS stream ended. code=' + error.code + ' details= ' + error.details);
this.adsCall = null;
this.reportStreamError(error);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
});
const endpointWatcherNames = Array.from(this.endpointWatchers.keys());
if (endpointWatcherNames.length > 0) {
this.adsCall.write({
node: this.node,
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: endpointWatcherNames
});
@ -159,7 +195,7 @@ export class XdsClient {
return;
}
this.adsCall.write({
node: this.node,
node: this.node!,
type_url: typeUrl,
version_info: versionInfo,
response_nonce: nonce,
@ -178,7 +214,7 @@ export class XdsClient {
return;
}
this.adsCall.write({
node: this.node,
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
@ -195,7 +231,7 @@ export class XdsClient {
return;
}
this.adsCall.write({
node: this.node,
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
@ -206,19 +242,18 @@ export class XdsClient {
});
}
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateEdsResponse(message: edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output): boolean {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
if (!lb.endpoint) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
return false;
}
if (!lb.endpoint.address) {
return false;
}
if (!lb.endpoint.address.socket_address) {
return false;
}
const socketAddress = lb.endpoint.address.socket_address;
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
@ -240,7 +275,7 @@ export class XdsClient {
private updateEdsNames() {
if (this.adsCall) {
this.adsCall.write({
node: this.node,
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
@ -259,6 +294,7 @@ export class XdsClient {
}
addEndpointWatcher(edsServiceName: string, watcher: Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>) {
trace('Watcher added for endpoint ' + edsServiceName);
let watchersEntry = this.endpointWatchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
@ -273,12 +309,27 @@ export class XdsClient {
}
removeEndpointWatcher(edsServiceName: string, watcher: Watcher<edsTypes.messages.envoy.api.v2.ClusterLoadAssignment__Output>) {
trace('Watcher removed for endpoint ' + edsServiceName);
const watchersEntry = this.endpointWatchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.endpointWatchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateEdsNames();
}
}
shutdown(): void {
this.adsCall?.cancel();
this.client?.close();
this.hasShutdown = true;
}
}