grpc-node/packages/grpc-js-xds/src/xds-client.ts

1339 lines
44 KiB
TypeScript

/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { Channel, ChannelCredentials, ClientDuplexStream, Metadata, StatusObject, connectivityState, experimental, loadPackageDefinition, logVerbosity, status } from "@grpc/grpc-js";
import { XdsDecodeContext, XdsDecodeResult, XdsResourceType } from "./xds-resource-type/xds-resource-type";
import { XdsResourceName, parseXdsResourceName, xdsResourceNameToString } from "./resources";
import { Node } from "./generated/envoy/config/core/v3/Node";
import { BootstrapInfo, CertificateProviderConfig, XdsServerConfig, loadBootstrapInfo, serverConfigEqual } from "./xds-bootstrap";
import BackoffTimeout = experimental.BackoffTimeout;
import { DiscoveryRequest } from "./generated/envoy/service/discovery/v3/DiscoveryRequest";
import { DiscoveryResponse__Output } from "./generated/envoy/service/discovery/v3/DiscoveryResponse";
import * as adsTypes from './generated/ads';
import * as lrsTypes from './generated/lrs';
import * as protoLoader from '@grpc/proto-loader';
import { AggregatedDiscoveryServiceClient } from "./generated/envoy/service/discovery/v3/AggregatedDiscoveryService";
import { LoadReportingServiceClient } from "./generated/envoy/service/load_stats/v3/LoadReportingService";
import { createGoogleDefaultCredentials } from "./google-default-credentials";
import { Any__Output } from "./generated/google/protobuf/Any";
import { LoadStatsRequest } from "./generated/envoy/service/load_stats/v3/LoadStatsRequest";
import { LoadStatsResponse__Output } from "./generated/envoy/service/load_stats/v3/LoadStatsResponse";
import { Locality, Locality__Output } from "./generated/envoy/config/core/v3/Locality";
import { Duration } from "./generated/google/protobuf/Duration";
import { registerXdsClientWithCsds } from "./csds";
import CertificateProvider = experimental.CertificateProvider;
import FileWatcherCertificateProvider = experimental.FileWatcherCertificateProvider;
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
let loadedProtos: adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType | null = null;
function loadAdsProtos(): adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType {
if (loadedProtos !== null) {
return loadedProtos;
}
return (loadPackageDefinition(protoLoader
.loadSync(
[
'envoy/service/discovery/v3/ads.proto',
'envoy/service/load_stats/v3/lrs.proto',
],
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
json: true,
includeDirs: [
// Paths are relative to src/build
__dirname + '/../../deps/envoy-api/',
__dirname + '/../../deps/xds/',
__dirname + '/../../deps/googleapis/',
__dirname + '/../../deps/protoc-gen-validate/',
],
}
)) as unknown) as adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType;
}
const clientVersion = require('../../package.json').version;
export interface ResourceWatcherInterface {
onGenericResourceChanged(resource: object): void;
onError(status: StatusObject): void;
onResourceDoesNotExist(): void;
}
export interface BasicWatcher<UpdateType> {
onResourceChanged(resource: UpdateType): void;
onError(status: StatusObject): void;
onResourceDoesNotExist(): void;
}
export class Watcher<UpdateType> implements ResourceWatcherInterface {
constructor(private internalWatcher: BasicWatcher<UpdateType>) {}
onGenericResourceChanged(resource: object): void {
this.internalWatcher.onResourceChanged(resource as unknown as UpdateType);
}
onError(status: StatusObject) {
this.internalWatcher.onError(status);
}
onResourceDoesNotExist() {
this.internalWatcher.onResourceDoesNotExist();
}
}
const RESOURCE_TIMEOUT_MS = 15_000;
class ResourceTimer {
private timer: NodeJS.Timeout | null = null;
private resourceSeen = false;
constructor(private callState: AdsCallState, private type: XdsResourceType, private name: XdsResourceName) {}
maybeCancelTimer() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
markSeen() {
this.resourceSeen = true;
this.maybeCancelTimer();
}
markAdsStreamStarted() {
this.maybeStartTimer();
}
private maybeStartTimer() {
if (this.resourceSeen) {
return;
}
if (this.timer) {
return;
}
const authorityState = this.callState.client.xdsClient.authorityStateMap.get(this.name.authority);
if (!authorityState) {
return;
}
const resourceState = authorityState.resourceMap.get(this.type)?.get(this.name.key);
if (resourceState?.cachedResource) {
return;
}
this.timer = setTimeout(() => {
this.onTimer();
}, RESOURCE_TIMEOUT_MS);
}
private onTimer() {
const authorityState = this.callState.client.xdsClient.authorityStateMap.get(this.name.authority);
const resourceState = authorityState?.resourceMap.get(this.type)?.get(this.name.key);
if (!resourceState) {
return;
}
resourceState.meta.clientStatus = 'DOES_NOT_EXIST';
for (const watcher of resourceState.watchers) {
watcher.onResourceDoesNotExist();
}
}
}
interface AdsParseResult {
type?: XdsResourceType;
typeUrl?: string;
version?: string;
nonce?: string;
errors: string[];
/**
* authority -> set of keys
*/
resourcesSeen: Map<string, Set<string>>;
haveValidResources: boolean;
}
/**
* Responsible for parsing a single ADS response, one resource at a time
*/
class AdsResponseParser {
private result: AdsParseResult = {
errors: [],
resourcesSeen: new Map(),
haveValidResources: false
};
private updateTime = new Date();
constructor(private adsCallState: AdsCallState) {}
processAdsResponseFields(message: DiscoveryResponse__Output) {
const type = this.adsCallState.client.xdsClient.getResourceType(message.type_url);
if (!type) {
throw new Error(`Unexpected type URL ${message.type_url}`);
}
this.result.type = type;
this.result.typeUrl = message.type_url;
this.result.nonce = message.nonce;
this.result.version = message.version_info;
}
parseResource(index: number, resource: Any__Output) {
const errorPrefix = `resource index ${index}:`;
if (resource.type_url !== this.result.typeUrl) {
this.result.errors.push(`${errorPrefix} incorrect resource type "${resource.type_url}" (should be "${this.result.typeUrl}")`);
return;
}
if (!this.result.type) {
this.adsCallState.client.trace('Received resource for uninitialized type ' + resource.type_url);
return;
}
const decodeContext: XdsDecodeContext = {
server: this.adsCallState.client.xdsServerConfig
};
let decodeResult: XdsDecodeResult;
try {
decodeResult = this.result.type.decode(decodeContext, resource);
} catch (e) {
this.result.errors.push(`${errorPrefix} ${(e as Error).message}`);
return;
}
let parsedName: XdsResourceName;
try {
parsedName = parseXdsResourceName(decodeResult.name, this.result.type!.getTypeUrl());
} catch (e) {
this.result.errors.push(`${errorPrefix} ${(e as Error).message}`);
return;
}
this.adsCallState.typeStates.get(this.result.type!)?.subscribedResources.get(parsedName.authority)?.get(parsedName.key)?.markSeen();
if (this.result.type.allResourcesRequiredInSotW()) {
if (!this.result.resourcesSeen.has(parsedName.authority)) {
this.result.resourcesSeen.set(parsedName.authority, new Set());
}
this.result.resourcesSeen.get(parsedName.authority)!.add(parsedName.key);
}
const resourceState = this.adsCallState.client.xdsClient.authorityStateMap.get(parsedName.authority)?.resourceMap.get(this.result.type)?.get(parsedName.key);
if (!resourceState) {
// No subscription for this resource
this.adsCallState.client.trace('Received resource of type ' + this.result.type.getTypeUrl() + ' named ' + decodeResult.name + ' with no subscriptions');
return;
}
if (resourceState.deletionIgnored) {
experimental.log(logVerbosity.INFO, `Received resource with previously ignored deletion: ${decodeResult.name}`);
resourceState.deletionIgnored = false;
}
if (decodeResult.error) {
this.result.errors.push(`${errorPrefix} ${decodeResult.error}`);
process.nextTick(() => {
for (const watcher of resourceState.watchers) {
watcher.onError({code: status.UNAVAILABLE, details: decodeResult.error!, metadata: new Metadata()});
}
});
resourceState.meta.clientStatus = 'NACKED';
resourceState.meta.failedVersion = this.result.version!;
resourceState.meta.failedDetails = decodeResult.error;
resourceState.meta.failedUpdateTime = this.updateTime;
return;
}
if (!decodeResult.value) {
this.adsCallState.client.trace('Failed to parse resource of type ' + this.result.type.getTypeUrl());
return;
}
this.adsCallState.client.trace('Parsed resource of type ' + this.result.type.getTypeUrl() + ': ' + JSON.stringify(decodeResult.value, (key, value) => (value && value.type === 'Buffer' && Array.isArray(value.data)) ? (value.data as Number[]).map(n => n.toString(16)).join('') : value, 2));
this.result.haveValidResources = true;
if (this.result.type.resourcesEqual(resourceState.cachedResource, decodeResult.value)) {
return;
}
resourceState.cachedResource = decodeResult.value;
resourceState.meta = {
clientStatus: 'ACKED',
rawResource: resource,
updateTime: this.updateTime,
version: this.result.version!
};
process.nextTick(() => {
trace('Notifying ' + resourceState.watchers.size + ' watchers of ' + this.result.type?.getTypeUrl() + ' update');
for (const watcher of resourceState.watchers) {
watcher.onGenericResourceChanged(decodeResult.value!);
}
});
}
getResult() {
return this.result;
}
}
type AdsCall = ClientDuplexStream<DiscoveryRequest, DiscoveryResponse__Output>;
interface ResourceTypeState {
nonce?: string;
error?: string;
/**
* authority -> key -> timer
*/
subscribedResources: Map<string, Map<string, ResourceTimer>>;
}
class AdsCallState {
public typeStates: Map<XdsResourceType, ResourceTypeState> = new Map();
private receivedAnyResponse = false;
private sentInitialMessage = false;
constructor(public client: XdsSingleServerClient, private call: AdsCall, private node: Node) {
// Populate subscription map with existing subscriptions
for (const [authority, authorityState] of client.xdsClient.authorityStateMap) {
if (authorityState.client !== client) {
continue;
}
for (const [type, typeMap] of authorityState.resourceMap) {
for (const key of typeMap.keys()) {
this.subscribe(type, {authority, key}, true);
}
}
}
for (const type of this.typeStates.keys()) {
this.updateNames(type);
}
call.on('data', (message: DiscoveryResponse__Output) => {
this.handleResponseMessage(message);
})
call.on('status', (status: StatusObject) => {
this.handleStreamStatus(status);
});
call.on('error', () => {});
}
private trace(text: string) {
this.client.trace(text);
}
private handleResponseMessage(message: DiscoveryResponse__Output) {
const parser = new AdsResponseParser(this);
let handledAdsResponseFields: boolean;
try {
parser.processAdsResponseFields(message);
handledAdsResponseFields = true;
} catch (e) {
this.trace('ADS response field parsing failed for type ' + message.type_url);
handledAdsResponseFields = false;
}
if (handledAdsResponseFields) {
for (const [index, resource] of message.resources.entries()) {
parser.parseResource(index, resource);
}
const result = parser.getResult();
const typeState = this.typeStates.get(result.type!);
if (!typeState) {
this.trace('Type state not found for type ' + result.type!.getTypeUrl());
return;
}
typeState.nonce = result.nonce;
if (result.errors.length > 0) {
typeState.error = `xDS response validation errors: [${result.errors.join('; ')}]`;
} else {
delete typeState.error;
}
// Delete resources not seen in update if needed
if (result.type!.allResourcesRequiredInSotW()) {
for (const [authority, authorityState] of this.client.xdsClient.authorityStateMap) {
if (authorityState.client !== this.client) {
continue;
}
const typeMap = authorityState.resourceMap.get(result.type!);
if (!typeMap) {
continue;
}
for (const [key, resourceState] of typeMap) {
if (!result.resourcesSeen.get(authority)?.has(key)) {
/* Do nothing for resources that have no cached value. Those are
* handled by the resource timer. */
if (!resourceState.cachedResource) {
continue;
}
if (this.client.ignoreResourceDeletion) {
experimental.log(logVerbosity.ERROR, 'Ignoring nonexistent resource ' + xdsResourceNameToString({authority, key}, result.type!.getTypeUrl()));
resourceState.deletionIgnored = true;
} else {
resourceState.meta.clientStatus = 'DOES_NOT_EXIST';
process.nextTick(() => {
for (const watcher of resourceState.watchers) {
watcher.onResourceDoesNotExist();
}
});
}
}
}
}
}
if (result.haveValidResources || result.errors.length === 0) {
this.client.resourceTypeVersionMap.set(result.type!, result.version!);
}
this.updateNames(result.type!);
}
}
private* allWatchers() {
for (const [type, typeState] of this.typeStates) {
for (const [authority, authorityMap] of typeState.subscribedResources) {
for (const key of authorityMap.keys()) {
yield* this.client.xdsClient.authorityStateMap.get(authority)?.resourceMap.get(type)?.get(key)?.watchers ?? [];
}
}
}
}
private handleStreamStatus(streamStatus: StatusObject) {
this.trace(
'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
);
if (streamStatus.code !== status.OK && !this.receivedAnyResponse) {
for (const watcher of this.allWatchers()) {
watcher.onError(streamStatus);
}
}
this.client.handleAdsStreamEnd();
}
hasSubscribedResources(): boolean {
for (const typeState of this.typeStates.values()) {
for (const authorityMap of typeState.subscribedResources.values()) {
if (authorityMap.size > 0) {
return true;
}
}
}
return false;
}
subscribe(type: XdsResourceType, name: XdsResourceName, delaySend: boolean = false) {
let typeState = this.typeStates.get(type);
if (!typeState) {
typeState = {
nonce: '',
subscribedResources: new Map()
};
this.typeStates.set(type, typeState);
}
let authorityMap = typeState.subscribedResources.get(name.authority);
if (!authorityMap) {
authorityMap = new Map();
typeState.subscribedResources.set(name.authority, authorityMap);
}
if (!authorityMap.has(name.key)) {
const timer = new ResourceTimer(this, type, name);
authorityMap.set(name.key, timer);
if (!delaySend) {
this.updateNames(type);
}
}
}
unsubscribe(type: XdsResourceType, name: XdsResourceName) {
const typeState = this.typeStates.get(type);
if (!typeState) {
return;
}
const authorityMap = typeState.subscribedResources.get(name.authority);
if (!authorityMap) {
return;
}
authorityMap.delete(name.key);
if (authorityMap.size === 0) {
typeState.subscribedResources.delete(name.authority);
}
this.updateNames(type);
}
resourceNamesForRequest(type: XdsResourceType): string[] {
const typeState = this.typeStates.get(type);
if (!typeState) {
return [];
}
const result: string[] = [];
for (const [authority, authorityMap] of typeState.subscribedResources) {
for (const [key, timer] of authorityMap) {
result.push(xdsResourceNameToString({authority, key}, type.getTypeUrl()));
}
}
return result;
}
updateNames(type: XdsResourceType) {
const typeState = this.typeStates.get(type);
if (!typeState) {
return;
}
const request: DiscoveryRequest = {
node: this.sentInitialMessage ? null : this.node,
type_url: type.getFullTypeUrl(),
response_nonce: typeState.nonce,
resource_names: this.resourceNamesForRequest(type),
version_info: this.client.resourceTypeVersionMap.get(type),
error_detail: typeState.error ? { code: status.UNAVAILABLE, message: typeState.error} : null
};
this.trace('Sending discovery request: ' + JSON.stringify(request, undefined, 2));
this.call.write(request);
this.sentInitialMessage = true;
}
end() {
this.call.end();
}
/**
* Should be called when the channel state is READY after starting the
* stream.
*/
markStreamStarted() {
for (const [type, typeState] of this.typeStates) {
for (const [authority, authorityMap] of typeState.subscribedResources) {
for (const resourceTimer of authorityMap.values()) {
resourceTimer.markAdsStreamStarted();
}
}
}
}
}
type LrsCall = ClientDuplexStream<LoadStatsRequest, LoadStatsResponse__Output>;
function localityEqual(
loc1: Locality__Output,
loc2: Locality__Output
): boolean {
return (
loc1.region === loc2.region &&
loc1.zone === loc2.zone &&
loc1.sub_zone === loc2.sub_zone
);
}
export interface XdsClusterDropStats {
addUncategorizedCallDropped(): void;
addCallDropped(category: string): void;
}
export interface XdsClusterLocalityStats {
addCallStarted(): void;
addCallFinished(fail: boolean): void;
}
interface DroppedRequests {
category: string;
dropped_count: number;
}
interface UpstreamLocalityStats {
locality: Locality;
total_issued_requests: number;
total_successful_requests: number;
total_error_requests: number;
total_requests_in_progress: number;
}
/**
* An interface representing the ClusterStats message type, restricted to the
* fields used in this module to ensure compatibility with both v2 and v3 APIs.
*/
interface ClusterStats {
cluster_name: string;
cluster_service_name: string;
dropped_requests: DroppedRequests[];
total_dropped_requests: number;
upstream_locality_stats: UpstreamLocalityStats[];
load_report_interval: Duration
}
interface ClusterLocalityStats {
locality: Locality__Output;
callsStarted: number;
callsSucceeded: number;
callsFailed: number;
callsInProgress: number;
refcount: number;
}
interface ClusterLoadReport {
callsDropped: Map<string, number>;
uncategorizedCallsDropped: number;
localityStats: Set<ClusterLocalityStats>;
intervalStart: [number, number];
}
interface StatsMapEntry {
clusterName: string;
edsServiceName: string;
refCount: number;
stats: ClusterLoadReport;
}
class ClusterLoadReportMap {
private statsMap: Set<StatsMapEntry> = new Set();
get(
clusterName: string,
edsServiceName: string
): ClusterLoadReport | undefined {
for (const statsObj of this.statsMap) {
if (
statsObj.clusterName === clusterName &&
statsObj.edsServiceName === edsServiceName
) {
return statsObj.stats;
}
}
return undefined;
}
/**
* Get the indicated map entry if it exists, or create a new one if it does
* not. Increments the refcount of that entry, so a call to this method
* should correspond to a later call to unref
* @param clusterName
* @param edsServiceName
* @returns
*/
getOrCreate(clusterName: string, edsServiceName: string): ClusterLoadReport {
for (const statsObj of this.statsMap) {
if (
statsObj.clusterName === clusterName &&
statsObj.edsServiceName === edsServiceName
) {
statsObj.refCount += 1;
return statsObj.stats;
}
}
const newStats: ClusterLoadReport = {
callsDropped: new Map<string, number>(),
uncategorizedCallsDropped: 0,
localityStats: new Set(),
intervalStart: process.hrtime(),
};
this.statsMap.add({
clusterName,
edsServiceName,
refCount: 1,
stats: newStats,
});
return newStats;
}
*entries(): IterableIterator<
[{ clusterName: string; edsServiceName: string }, ClusterLoadReport]
> {
for (const statsEntry of this.statsMap) {
yield [
{
clusterName: statsEntry.clusterName,
edsServiceName: statsEntry.edsServiceName,
},
statsEntry.stats,
];
}
}
unref(clusterName: string, edsServiceName: string) {
for (const statsObj of this.statsMap) {
if (
statsObj.clusterName === clusterName &&
statsObj.edsServiceName === edsServiceName
) {
statsObj.refCount -=1;
if (statsObj.refCount === 0) {
this.statsMap.delete(statsObj);
}
return;
}
}
}
get size() {
return this.statsMap.size;
}
}
class LrsCallState {
private statsTimer: NodeJS.Timeout | null = null;
private sentInitialMessage = false;
constructor(private client: XdsSingleServerClient, private call: LrsCall, private node: Node) {
call.on('data', (message: LoadStatsResponse__Output) => {
this.handleResponseMessage(message);
})
call.on('status', (status: StatusObject) => {
this.handleStreamStatus(status);
});
call.on('error', () => {});
this.sendStats();
}
destroy() {
if (this.statsTimer) {
clearInterval(this.statsTimer);
this.statsTimer = null;
}
return null;
}
private handleStreamStatus(status: StatusObject) {
this.client.trace(
'LRS stream ended. code=' + status.code + ' details= ' + status.details
);
this.client.handleLrsStreamEnd();
}
private handleResponseMessage(message: LoadStatsResponse__Output) {
this.client.trace('Received LRS response');
this.client.onLrsStreamReceivedMessage();
if (
!this.statsTimer ||
message.load_reporting_interval?.seconds !==
this.client.latestLrsSettings?.load_reporting_interval?.seconds ||
message.load_reporting_interval?.nanos !==
this.client.latestLrsSettings?.load_reporting_interval?.nanos
) {
/* Only reset the timer if the interval has changed or was not set
* before. */
if (this.statsTimer) {
clearInterval(this.statsTimer);
}
/* Convert a google.protobuf.Duration to a number of milliseconds for
* use with setInterval. */
const loadReportingIntervalMs =
Number.parseInt(message.load_reporting_interval!.seconds) * 1000 +
message.load_reporting_interval!.nanos / 1_000_000;
this.client.trace('Received LRS response with load reporting interval ' + loadReportingIntervalMs + ' ms');
this.statsTimer = setInterval(() => {
this.sendStats();
}, loadReportingIntervalMs);
}
this.client.latestLrsSettings = message;
}
private sendLrsMessage(clusterStats: ClusterStats[]) {
const request: LoadStatsRequest = {
node: this.sentInitialMessage ? null : this.node,
cluster_stats: clusterStats
};
this.client.trace('Sending LRS message ' + JSON.stringify(request, undefined, 2));
this.call.write(request);
this.sentInitialMessage = true;
}
private get latestLrsSettings() {
return this.client.latestLrsSettings;
}
private sendStats() {
if (!this.latestLrsSettings) {
this.sendLrsMessage([]);
return;
}
const clusterStats: ClusterStats[] = [];
for (const [
{ clusterName, edsServiceName },
stats,
] of this.client.clusterStatsMap.entries()) {
if (
this.latestLrsSettings.send_all_clusters ||
this.latestLrsSettings.clusters.indexOf(clusterName) > 0
) {
const upstreamLocalityStats: UpstreamLocalityStats[] = [];
for (const localityStats of stats.localityStats) {
// Skip localities with 0 requests
if (
localityStats.callsStarted > 0 ||
localityStats.callsSucceeded > 0 ||
localityStats.callsFailed > 0
) {
upstreamLocalityStats.push({
locality: localityStats.locality,
total_issued_requests: localityStats.callsStarted,
total_successful_requests: localityStats.callsSucceeded,
total_error_requests: localityStats.callsFailed,
total_requests_in_progress: localityStats.callsInProgress,
});
localityStats.callsStarted = 0;
localityStats.callsSucceeded = 0;
localityStats.callsFailed = 0;
}
}
const droppedRequests: DroppedRequests[] = [];
let totalDroppedRequests = 0;
for (const [category, count] of stats.callsDropped.entries()) {
if (count > 0) {
droppedRequests.push({
category,
dropped_count: count,
});
totalDroppedRequests += count;
}
}
totalDroppedRequests += stats.uncategorizedCallsDropped;
// Clear out dropped call stats after sending them
stats.callsDropped.clear();
stats.uncategorizedCallsDropped = 0;
const interval = process.hrtime(stats.intervalStart);
stats.intervalStart = process.hrtime();
// Skip clusters with 0 requests
if (upstreamLocalityStats.length > 0 || totalDroppedRequests > 0) {
clusterStats.push({
cluster_name: clusterName,
cluster_service_name: edsServiceName,
dropped_requests: droppedRequests,
total_dropped_requests: totalDroppedRequests,
upstream_locality_stats: upstreamLocalityStats,
load_report_interval: {
seconds: interval[0],
nanos: interval[1],
},
});
}
}
}
this.sendLrsMessage(clusterStats);
}
}
class XdsSingleServerClient {
public ignoreResourceDeletion: boolean;
private adsBackoff: BackoffTimeout;
private lrsBackoff: BackoffTimeout;
private adsClient: AggregatedDiscoveryServiceClient;
private adsCallState: AdsCallState | null = null;
private lrsClient: LoadReportingServiceClient;
private lrsCallState: LrsCallState | null = null;
public clusterStatsMap = new ClusterLoadReportMap();
public latestLrsSettings: LoadStatsResponse__Output | null = null;
/**
* The number of authorities that are using this client. Streams should only
* be started if refcount > 0
*/
private refcount = 0;
/**
* Map of type to latest accepted version string for that type
*/
public resourceTypeVersionMap: Map<XdsResourceType, string> = new Map();
constructor(public xdsClient: XdsClient, bootstrapNode: Node, public xdsServerConfig: XdsServerConfig) {
this.adsBackoff = new BackoffTimeout(() => {
this.maybeStartAdsStream();
});
this.adsBackoff.unref();
this.lrsBackoff = new BackoffTimeout(() => {
this.maybeStartLrsStream();
});
this.lrsBackoff.unref();
this.ignoreResourceDeletion = xdsServerConfig.server_features.includes('ignore_resource_deletion');
const channelArgs = {
// 5 minutes
'grpc.keepalive_time_ms': 5 * 60 * 1000
}
const credentialsConfigs = xdsServerConfig.channel_creds;
let channelCreds: ChannelCredentials | null = null;
for (const config of credentialsConfigs) {
if (config.type === 'google_default') {
channelCreds = createGoogleDefaultCredentials();
break;
} else if (config.type === 'insecure') {
channelCreds = ChannelCredentials.createInsecure();
break;
}
}
const serverUri = this.xdsServerConfig.server_uri
this.trace('Starting xDS client connected to server URI ' + this.xdsServerConfig.server_uri);
/* Bootstrap validation rules guarantee that a matching channel credentials
* config exists in the list. */
const channel = new Channel(serverUri, channelCreds!, channelArgs);
const protoDefinitions = loadAdsProtos();
this.adsClient = new protoDefinitions.envoy.service.discovery.v3.AggregatedDiscoveryService(
serverUri,
channelCreds!,
{channelOverride: channel}
);
channel.watchConnectivityState(channel.getConnectivityState(false), Infinity, () => {
this.handleAdsConnectivityStateUpdate();
});
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService(
serverUri,
channelCreds!,
{channelOverride: channel}
);
}
private handleAdsConnectivityStateUpdate() {
const state = this.adsClient.getChannel().getConnectivityState(false);
if (state === connectivityState.READY) {
this.adsCallState?.markStreamStarted();
}
if (state === connectivityState.TRANSIENT_FAILURE) {
for (const authorityState of this.xdsClient.authorityStateMap.values()) {
if (authorityState.client !== this) {
continue;
}
for (const typeMap of authorityState.resourceMap.values()) {
for (const resourceState of typeMap.values()) {
for (const watcher of resourceState.watchers) {
watcher.onError({
code: status.UNAVAILABLE,
details: 'No connection established to xDS server',
metadata: new Metadata()
});
}
}
}
}
}
this.adsClient.getChannel().watchConnectivityState(state, Infinity, () => {
this.handleAdsConnectivityStateUpdate();
});
}
onAdsStreamReceivedMessage() {
this.adsBackoff.stop();
this.adsBackoff.reset();
}
handleAdsStreamEnd() {
this.adsCallState = null;
/* The backoff timer would start the stream when it finishes. If it is not
* running, restart the stream immediately. */
if (!this.adsBackoff.isRunning()) {
this.maybeStartAdsStream();
}
}
private maybeStartAdsStream() {
if (this.adsCallState || this.refcount < 1) {
return;
}
this.trace('Starting ADS stream');
const metadata = new Metadata({waitForReady: true});
const call = this.adsClient.StreamAggregatedResources(metadata);
this.adsCallState = new AdsCallState(this, call, this.xdsClient.adsNode!);
this.adsBackoff.runOnce();
}
onLrsStreamReceivedMessage() {
this.lrsBackoff.stop();
this.lrsBackoff.reset();
}
handleLrsStreamEnd() {
this.lrsCallState = this.lrsCallState ? this.lrsCallState.destroy() : null;
/* The backoff timer would start the stream when it finishes. If it is not
* running, restart the stream immediately. */
if (!this.lrsBackoff.isRunning()) {
this.maybeStartLrsStream();
}
}
private maybeStartLrsStream() {
if (this.lrsCallState || this.refcount < 1 || this.clusterStatsMap.size < 1) {
return;
}
this.trace('Starting LRS stream');
const metadata = new Metadata({waitForReady: true});
const call = this.lrsClient.StreamLoadStats(metadata);
this.lrsCallState = new LrsCallState(this, call, this.xdsClient.lrsNode!);
this.lrsBackoff.runOnce();
}
trace(text: string) {
trace(this.xdsServerConfig.server_uri + ' ' + text);
}
subscribe(type: XdsResourceType, name: XdsResourceName) {
this.trace('subscribe(type=' + type.getTypeUrl() + ', name=' + xdsResourceNameToString(name, type.getTypeUrl()) + ')');
this.trace(JSON.stringify(name));
this.maybeStartAdsStream();
this.adsCallState?.subscribe(type, name);
}
unsubscribe(type: XdsResourceType, name: XdsResourceName) {
this.trace('unsubscribe(type=' + type.getTypeUrl() + ', name=' + xdsResourceNameToString(name, type.getTypeUrl()) + ')');
this.adsCallState?.unsubscribe(type, name);
if (this.adsCallState && !this.adsCallState.hasSubscribedResources()) {
this.adsCallState.end();
this.adsCallState = null;
}
}
ref() {
this.refcount += 1;
}
unref() {
this.refcount -= 1;
}
addClusterDropStats(
clusterName: string,
edsServiceName: string
): XdsClusterDropStats {
this.trace('addClusterDropStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')');
const clusterStats = this.clusterStatsMap.getOrCreate(
clusterName,
edsServiceName
);
this.maybeStartLrsStream();
return {
addUncategorizedCallDropped: () => {
clusterStats.uncategorizedCallsDropped += 1;
},
addCallDropped: (category) => {
const prevCount = clusterStats.callsDropped.get(category) ?? 0;
clusterStats.callsDropped.set(category, prevCount + 1);
},
};
}
removeClusterDropStats(clusterName: string, edsServiceName: string) {
this.trace('removeClusterDropStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')');
this.clusterStatsMap.unref(clusterName, edsServiceName);
}
addClusterLocalityStats(
clusterName: string,
edsServiceName: string,
locality: Locality__Output
): XdsClusterLocalityStats {
this.trace('addClusterLocalityStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ', locality=' + JSON.stringify(locality) + ')');
const clusterStats = this.clusterStatsMap.getOrCreate(
clusterName,
edsServiceName
);
this.maybeStartLrsStream();
let localityStats: ClusterLocalityStats | null = null;
for (const statsObj of clusterStats.localityStats) {
if (localityEqual(locality, statsObj.locality)) {
localityStats = statsObj;
break;
}
}
if (localityStats === null) {
localityStats = {
locality: locality,
callsInProgress: 0,
callsStarted: 0,
callsSucceeded: 0,
callsFailed: 0,
refcount: 0,
};
clusterStats.localityStats.add(localityStats);
}
/* Help the compiler understand that this object is always non-null in the
* closure */
const finalLocalityStats: ClusterLocalityStats = localityStats;
return {
addCallStarted: () => {
finalLocalityStats.callsStarted += 1;
finalLocalityStats.callsInProgress += 1;
},
addCallFinished: (fail) => {
if (fail) {
finalLocalityStats.callsFailed += 1;
} else {
finalLocalityStats.callsSucceeded += 1;
}
finalLocalityStats.callsInProgress -= 1;
},
};
}
removeClusterLocalityStats(
clusterName: string,
edsServiceName: string,
locality: Locality__Output
) {
this.trace('removeClusterLocalityStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ', locality=' + JSON.stringify(locality) + ')');
const clusterStats = this.clusterStatsMap.get(clusterName, edsServiceName);
if (!clusterStats) {
return;
}
for (const statsObj of clusterStats.localityStats) {
if (localityEqual(locality, statsObj.locality)) {
statsObj.refcount -= 1;
if (statsObj.refcount === 0) {
clusterStats.localityStats.delete(statsObj);
}
break;
}
}
this.clusterStatsMap.unref(clusterName, edsServiceName);
}
}
interface ClientMapEntry {
serverConfig: XdsServerConfig;
client: XdsSingleServerClient;
}
type ClientResourceStatus = 'REQUESTED' | 'DOES_NOT_EXIST' | 'ACKED' | 'NACKED';
interface ResourceMetadata {
clientStatus: ClientResourceStatus;
rawResource?: Any__Output;
updateTime?: Date;
version?: string;
failedVersion?: string;
failedDetails?: string;
failedUpdateTime?: Date;
}
interface ResourceState {
watchers: Set<ResourceWatcherInterface>;
cachedResource: object | null;
meta: ResourceMetadata;
deletionIgnored: boolean;
}
interface AuthorityState {
client: XdsSingleServerClient;
/**
* type -> key -> state
*/
resourceMap: Map<XdsResourceType, Map<string, ResourceState>>;
}
const userAgentName = 'gRPC Node Pure JS';
function createCertificateProvider(config: CertificateProviderConfig) {
switch (config.pluginName) {
case 'file_watcher':
return new FileWatcherCertificateProvider(config.config);
default:
throw new Error(`Unexpected certificate provider plugin name ${config.pluginName}`);
}
}
export class XdsClient {
/**
* authority -> authority state
*/
public authorityStateMap: Map<string, AuthorityState> = new Map();
private clients: ClientMapEntry[] = [];
private typeRegistry: Map<string, XdsResourceType> = new Map();
private bootstrapInfo: BootstrapInfo | null = null;
private certificateProviderRegistry: Map<string, CertificateProvider> = new Map();
private certificateProviderRegistryPopulated = false;
constructor(bootstrapInfoOverride?: BootstrapInfo) {
if (bootstrapInfoOverride) {
this.bootstrapInfo = bootstrapInfoOverride;
}
registerXdsClientWithCsds(this);
}
private getBootstrapInfo() {
if (!this.bootstrapInfo) {
this.bootstrapInfo = loadBootstrapInfo();
}
return this.bootstrapInfo;
}
get adsNode(): Node | undefined {
if (!this.bootstrapInfo) {
return undefined;
}
return {
...this.bootstrapInfo.node,
user_agent_name: userAgentName,
user_agent_version: clientVersion,
client_features: ['envoy.lb.does_not_support_overprovisioning'],
}
}
get lrsNode(): Node | undefined {
if (!this.bootstrapInfo) {
return undefined;
}
return {
...this.bootstrapInfo.node,
user_agent_name: userAgentName,
user_agent_version: clientVersion,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
}
private getOrCreateClient(authority: string): XdsSingleServerClient {
const bootstrapInfo = this.getBootstrapInfo();
let serverConfig: XdsServerConfig;
if (authority === 'old:') {
serverConfig = bootstrapInfo.xdsServers[0];
} else {
if (authority in bootstrapInfo.authorities) {
serverConfig = bootstrapInfo.authorities[authority].xdsServers?.[0] ?? bootstrapInfo.xdsServers[0];
} else {
throw new Error(`Authority ${authority} not found in bootstrap authorities list`);
}
}
for (const entry of this.clients) {
if (serverConfigEqual(serverConfig, entry.serverConfig)) {
return entry.client;
}
}
const client = new XdsSingleServerClient(this, bootstrapInfo.node, serverConfig);
this.clients.push({client, serverConfig});
return client;
}
private getClient(server: XdsServerConfig) {
for (const entry of this.clients) {
if (serverConfigEqual(server, entry.serverConfig)) {
return entry.client;
}
}
return undefined;
}
getResourceType(typeUrl: string) {
return this.typeRegistry.get(typeUrl);
}
watchResource(type: XdsResourceType, name: string, watcher: ResourceWatcherInterface) {
trace('watchResource(type=' + type.getTypeUrl() + ', name=' + name + ')');
if (this.typeRegistry.has(type.getTypeUrl())) {
if (this.typeRegistry.get(type.getTypeUrl()) !== type) {
throw new Error(`Resource type does not match previously used type with the same type URL: ${type.getTypeUrl()}`);
}
} else {
this.typeRegistry.set(type.getTypeUrl(), type);
this.typeRegistry.set(type.getFullTypeUrl(), type);
}
const resourceName = parseXdsResourceName(name, type.getTypeUrl());
let authorityState = this.authorityStateMap.get(resourceName.authority);
if (!authorityState) {
authorityState = {
client: this.getOrCreateClient(resourceName.authority),
resourceMap: new Map()
};
authorityState.client.ref();
this.authorityStateMap.set(resourceName.authority, authorityState);
}
let keyMap = authorityState.resourceMap.get(type);
if (!keyMap) {
keyMap = new Map();
authorityState.resourceMap.set(type, keyMap);
}
let entry = keyMap.get(resourceName.key);
let isNewSubscription = false;
if (!entry) {
isNewSubscription = true;
entry = {
watchers: new Set(),
cachedResource: null,
deletionIgnored: false,
meta: {
clientStatus: 'REQUESTED'
}
};
keyMap.set(resourceName.key, entry);
}
entry.watchers.add(watcher);
if (entry.cachedResource) {
process.nextTick(() => {
if (entry?.cachedResource) {
watcher.onGenericResourceChanged(entry.cachedResource);
}
});
}
if (isNewSubscription) {
authorityState.client.subscribe(type, resourceName);
}
}
cancelResourceWatch(type: XdsResourceType, name: string, watcher: ResourceWatcherInterface) {
trace('cancelResourceWatch(type=' + type.getTypeUrl() + ', name=' + name + ')');
const resourceName = parseXdsResourceName(name, type.getTypeUrl());
const authorityState = this.authorityStateMap.get(resourceName.authority);
if (!authorityState) {
return;
}
const entry = authorityState.resourceMap.get(type)?.get(resourceName.key);
if (entry) {
entry.watchers.delete(watcher);
if (entry.watchers.size === 0) {
authorityState.resourceMap.get(type)!.delete(resourceName.key);
authorityState.client.unsubscribe(type, resourceName);
if (authorityState.resourceMap.get(type)!.size === 0) {
authorityState.resourceMap.delete(type);
if (authorityState.resourceMap.size === 0) {
authorityState.client.unref();
this.authorityStateMap.delete(resourceName.authority);
}
}
}
}
}
addClusterDropStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string): XdsClusterDropStats {
const client = this.getClient(lrsServer);
if (!client) {
return {
addUncategorizedCallDropped: () => {},
addCallDropped: (category) => {},
};
}
return client.addClusterDropStats(clusterName, edsServiceName);
}
removeClusterDropStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string) {
this.getClient(lrsServer)?.removeClusterDropStats(clusterName, edsServiceName);
}
addClusterLocalityStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string, locality: Locality__Output): XdsClusterLocalityStats {
const client = this.getClient(lrsServer);
if (!client) {
return {
addCallStarted: () => {},
addCallFinished: (fail) => {},
};
}
return client.addClusterLocalityStats(clusterName, edsServiceName, locality);
}
removeClusterLocalityStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string, locality: Locality__Output) {
this.getClient(lrsServer)?.removeClusterLocalityStats(clusterName, edsServiceName, locality);
}
getCertificateProvider(instanceName: string): CertificateProvider | undefined {
if (!this.certificateProviderRegistryPopulated) {
for (const [name, config] of Object.entries(this.getBootstrapInfo().certificateProviders)) {
this.certificateProviderRegistry.set(name, createCertificateProvider(config));
}
this.certificateProviderRegistryPopulated = true;
}
return this.certificateProviderRegistry.get(instanceName);
}
}
let singletonXdsClient: XdsClient | null = null;
export function getSingletonXdsClient(): XdsClient {
if (singletonXdsClient === null) {
singletonXdsClient = new XdsClient();
}
return singletonXdsClient;
}