mirror of https://github.com/grpc/grpc-node.git
grpc-js: Implement federation support
This commit is contained in:
parent
1e9c766bc1
commit
546696c366
|
@ -61,6 +61,7 @@ const cleanAll = gulp.parallel(clean);
|
|||
const compile = checkTask(() => execNpmCommand('compile'));
|
||||
|
||||
const runTests = checkTask(() => {
|
||||
process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION = 'true';
|
||||
return gulp.src(`${outDir}/test/**/*.js`)
|
||||
.pipe(mocha({reporter: 'mocha-jenkins-reporter',
|
||||
require: ['ts-node/register']}));
|
||||
|
|
|
@ -17,4 +17,5 @@
|
|||
|
||||
export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION ?? 'true') === 'true';
|
||||
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
|
||||
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
|
||||
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
|
||||
export const EXPERIMENTAL_FEDERATION = (process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION ?? 'false') === 'true';
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
|
||||
import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
|
||||
import { getSingletonXdsClient, XdsClient } from './xds-client';
|
||||
import { getSingletonXdsClient, XdsSingleServerClient } from './xds-client';
|
||||
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
|
||||
import SubchannelAddress = experimental.SubchannelAddress;
|
||||
import UnavailablePicker = experimental.UnavailablePicker;
|
||||
|
@ -216,7 +216,7 @@ export class CdsLoadBalancer implements LoadBalancer {
|
|||
|
||||
private latestConfig: CdsLoadBalancingConfig | null = null;
|
||||
private latestAttributes: { [key: string]: unknown } = {};
|
||||
private xdsClient: XdsClient | null = null;
|
||||
private xdsClient: XdsSingleServerClient | null = null;
|
||||
|
||||
private clusterTree: ClusterTree = {};
|
||||
|
||||
|
@ -315,7 +315,7 @@ export class CdsLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
|
||||
this.latestAttributes = attributes;
|
||||
this.xdsClient = attributes.xdsClient as XdsClient;
|
||||
this.xdsClient = attributes.xdsClient as XdsSingleServerClient;
|
||||
|
||||
/* If the cluster is changing, disable the old watcher before adding the new
|
||||
* one */
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
import { connectivityState as ConnectivityState, StatusObject, status as Status, experimental } from '@grpc/grpc-js';
|
||||
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
|
||||
import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds-client';
|
||||
import { XdsClusterLocalityStats, XdsSingleServerClient, getSingletonXdsClient } from './xds-client';
|
||||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
import registerLoadBalancerType = experimental.registerLoadBalancerType;
|
||||
|
@ -169,7 +169,7 @@ export class LrsLoadBalancer implements LoadBalancer {
|
|||
if (!(lbConfig instanceof LrsLoadBalancingConfig)) {
|
||||
return;
|
||||
}
|
||||
this.localityStatsReporter = (attributes.xdsClient as XdsClient).addClusterLocalityStats(
|
||||
this.localityStatsReporter = (attributes.xdsClient as XdsSingleServerClient).addClusterLocalityStats(
|
||||
lbConfig.getLrsLoadReportingServerName(),
|
||||
lbConfig.getClusterName(),
|
||||
lbConfig.getEdsServiceName(),
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
|
||||
import { experimental, logVerbosity, status as Status, Metadata, connectivityState } from "@grpc/grpc-js";
|
||||
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from "./xds-client";
|
||||
import { getSingletonXdsClient, XdsSingleServerClient, XdsClusterDropStats } from "./xds-client";
|
||||
|
||||
import LoadBalancingConfig = experimental.LoadBalancingConfig;
|
||||
import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig;
|
||||
|
@ -222,7 +222,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
|
|||
private childBalancer: ChildLoadBalancerHandler;
|
||||
private latestConfig: XdsClusterImplLoadBalancingConfig | null = null;
|
||||
private clusterDropStats: XdsClusterDropStats | null = null;
|
||||
private xdsClient: XdsClient | null = null;
|
||||
private xdsClient: XdsSingleServerClient | null = null;
|
||||
|
||||
constructor(private readonly channelControlHelper: ChannelControlHelper) {
|
||||
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
|
||||
|
@ -243,7 +243,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
|
|||
}
|
||||
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
|
||||
this.latestConfig = lbConfig;
|
||||
this.xdsClient = attributes.xdsClient as XdsClient;
|
||||
this.xdsClient = attributes.xdsClient as XdsSingleServerClient;
|
||||
|
||||
if (lbConfig.getLrsLoadReportingServerName()) {
|
||||
this.clusterDropStats = this.xdsClient.addClusterDropStats(
|
||||
|
|
|
@ -23,7 +23,7 @@ import { ClusterLoadAssignment__Output } from "./generated/envoy/config/endpoint
|
|||
import { LrsLoadBalancingConfig } from "./load-balancer-lrs";
|
||||
import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from "./load-balancer-priority";
|
||||
import { WeightedTarget, WeightedTargetLoadBalancingConfig } from "./load-balancer-weighted-target";
|
||||
import { getSingletonXdsClient, XdsClient } from "./xds-client";
|
||||
import { getSingletonXdsClient, XdsSingleServerClient } from "./xds-client";
|
||||
import { DropCategory, XdsClusterImplLoadBalancingConfig } from "./load-balancer-xds-cluster-impl";
|
||||
import { Watcher } from "./xds-stream-state/xds-stream-state";
|
||||
|
||||
|
@ -243,7 +243,7 @@ export class XdsClusterResolver implements LoadBalancer {
|
|||
private discoveryMechanismList: DiscoveryMechanismEntry[] = [];
|
||||
private latestConfig: XdsClusterResolverLoadBalancingConfig | null = null;
|
||||
private latestAttributes: { [key: string]: unknown; } = {};
|
||||
private xdsClient: XdsClient | null = null;
|
||||
private xdsClient: XdsSingleServerClient | null = null;
|
||||
private childBalancer: ChildLoadBalancerHandler;
|
||||
|
||||
constructor(private readonly channelControlHelper: ChannelControlHelper) {
|
||||
|
@ -368,7 +368,7 @@ export class XdsClusterResolver implements LoadBalancer {
|
|||
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
|
||||
this.latestConfig = lbConfig;
|
||||
this.latestAttributes = attributes;
|
||||
this.xdsClient = attributes.xdsClient as XdsClient;
|
||||
this.xdsClient = attributes.xdsClient as XdsSingleServerClient;
|
||||
if (this.discoveryMechanismList.length === 0) {
|
||||
for (const mechanism of lbConfig.getDiscoveryMechanisms()) {
|
||||
const mechanismEntry: DiscoveryMechanismEntry = {
|
||||
|
|
|
@ -18,7 +18,7 @@ import * as protoLoader from '@grpc/proto-loader';
|
|||
|
||||
import { RE2 } from 're2-wasm';
|
||||
|
||||
import { getSingletonXdsClient, XdsClient } from './xds-client';
|
||||
import { getSingletonXdsClient, XdsSingleServerClient } from './xds-client';
|
||||
import { StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions } from '@grpc/grpc-js';
|
||||
import Resolver = experimental.Resolver;
|
||||
import GrpcUri = experimental.GrpcUri;
|
||||
|
@ -44,7 +44,7 @@ import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resourc
|
|||
import Duration = experimental.Duration;
|
||||
import { Duration__Output } from './generated/google/protobuf/Duration';
|
||||
import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
|
||||
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from './environment';
|
||||
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RETRY } from './environment';
|
||||
import Filter = experimental.Filter;
|
||||
import FilterFactory = experimental.FilterFactory;
|
||||
import RetryPolicy = experimental.RetryPolicy;
|
||||
|
@ -211,11 +211,24 @@ function getDefaultRetryMaxInterval(baseInterval: string): string {
|
|||
return `${Number.parseFloat(baseInterval.substring(0, baseInterval.length - 1)) * 10}s`;
|
||||
}
|
||||
|
||||
function formatTemplateString(templateString: string, value: string): string {
|
||||
return templateString.replace(/%s/g, value);
|
||||
/**
|
||||
* Encode a text string as a valid path of a URI, as specified in RFC-3986 section 3.3
|
||||
* @param uriPath A value representing an unencoded URI path
|
||||
* @returns
|
||||
*/
|
||||
function encodeURIPath(uriPath: string): string {
|
||||
return uriPath.replace(/[^A-Za-z0-9._~!$&^()*+,;=/-]/g, substring => encodeURIComponent(substring));
|
||||
}
|
||||
|
||||
function getListenerResourceName(bootstrapConfig: BootstrapInfo, target: GrpcUri): string {
|
||||
function formatTemplateString(templateString: string, value: string): string {
|
||||
if (templateString.startsWith('xdstp:')) {
|
||||
return templateString.replace(/%s/g, encodeURIPath(value));
|
||||
} else {
|
||||
return templateString.replace(/%s/g, value);
|
||||
}
|
||||
}
|
||||
|
||||
export function getListenerResourceName(bootstrapConfig: BootstrapInfo, target: GrpcUri): string {
|
||||
if (target.authority) {
|
||||
if (target.authority in bootstrapConfig.authorities) {
|
||||
return formatTemplateString(bootstrapConfig.authorities[target.authority].clientListenerResourceNameTemplate, target.path);
|
||||
|
@ -258,7 +271,9 @@ class XdsResolver implements Resolver {
|
|||
|
||||
private ldsHttpFilterConfigs: {name: string, config: HttpFilterConfig}[] = [];
|
||||
|
||||
private xdsClient: XdsClient;
|
||||
private bootstrapInfo: BootstrapInfo | null = null;
|
||||
|
||||
private xdsClient: XdsSingleServerClient;
|
||||
|
||||
constructor(
|
||||
private target: GrpcUri,
|
||||
|
@ -267,8 +282,8 @@ class XdsResolver implements Resolver {
|
|||
) {
|
||||
if (channelOptions[BOOTSTRAP_CONFIG_KEY]) {
|
||||
const parsedConfig = JSON.parse(channelOptions[BOOTSTRAP_CONFIG_KEY]);
|
||||
const validatedConfig = validateBootstrapConfig(parsedConfig);
|
||||
this.xdsClient = new XdsClient(validatedConfig);
|
||||
this.bootstrapInfo = validateBootstrapConfig(parsedConfig);
|
||||
this.xdsClient = new XdsSingleServerClient(this.bootstrapInfo);
|
||||
} else {
|
||||
this.xdsClient = getSingletonXdsClient();
|
||||
}
|
||||
|
@ -588,25 +603,40 @@ class XdsResolver implements Resolver {
|
|||
});
|
||||
}
|
||||
|
||||
updateResolution(): void {
|
||||
// Wait until updateResolution is called once to start the xDS requests
|
||||
loadBootstrapInfo().then((bootstrapInfo) => {
|
||||
if (!this.isLdsWatcherActive) {
|
||||
trace('Starting resolution for target ' + uriToString(this.target));
|
||||
try {
|
||||
this.listenerResourceName = getListenerResourceName(bootstrapInfo, this.target);
|
||||
trace('Resolving target ' + uriToString(this.target) + ' with Listener resource name ' + this.listenerResourceName);
|
||||
this.xdsClient.addListenerWatcher(this.listenerResourceName, this.ldsWatcher);
|
||||
this.isLdsWatcherActive = true;
|
||||
private startResolution(): void {
|
||||
if (!this.isLdsWatcherActive) {
|
||||
trace('Starting resolution for target ' + uriToString(this.target));
|
||||
try {
|
||||
this.listenerResourceName = getListenerResourceName(this.bootstrapInfo!, this.target);
|
||||
trace('Resolving target ' + uriToString(this.target) + ' with Listener resource name ' + this.listenerResourceName);
|
||||
this.xdsClient.addListenerWatcher(this.listenerResourceName, this.ldsWatcher);
|
||||
this.isLdsWatcherActive = true;
|
||||
|
||||
} catch (e) {
|
||||
this.reportResolutionError(e.message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
updateResolution(): void {
|
||||
if (EXPERIMENTAL_FEDERATION) {
|
||||
if (this.bootstrapInfo) {
|
||||
this.startResolution();
|
||||
} else {
|
||||
try {
|
||||
this.bootstrapInfo = loadBootstrapInfo();
|
||||
} catch (e) {
|
||||
this.reportResolutionError(e.message);
|
||||
}
|
||||
this.startResolution();
|
||||
}
|
||||
|
||||
}, (error) => {
|
||||
this.reportResolutionError(`${error}`);
|
||||
})
|
||||
} else {
|
||||
if (!this.isLdsWatcherActive) {
|
||||
trace('Starting resolution for target ' + uriToString(this.target));
|
||||
this.xdsClient.addListenerWatcher(this.target.path, this.ldsWatcher);
|
||||
this.isLdsWatcherActive = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
destroy() {
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
|
||||
import * as fs from 'fs';
|
||||
import { EXPERIMENTAL_FEDERATION } from './environment';
|
||||
import { Struct } from './generated/google/protobuf/Struct';
|
||||
import { Value } from './generated/google/protobuf/Value';
|
||||
|
||||
|
@ -47,7 +48,7 @@ export interface XdsServerConfig {
|
|||
|
||||
export interface Authority {
|
||||
clientListenerResourceNameTemplate: string;
|
||||
xdsServers: XdsServerConfig[];
|
||||
xdsServers?: XdsServerConfig[];
|
||||
}
|
||||
|
||||
export interface BootstrapInfo {
|
||||
|
@ -238,16 +239,60 @@ function validateNode(obj: any): Node {
|
|||
return result;
|
||||
}
|
||||
|
||||
export function validateBootstrapConfig(obj: any): BootstrapInfo {
|
||||
function validateAuthority(obj: any, authorityName: string): Authority {
|
||||
if ('client_listener_resource_name_template' in obj) {
|
||||
if (typeof obj.client_listener_resource_name_template !== 'string') {
|
||||
throw new Error(`authorities[${authorityName}].client_listener_resource_name_template: expected string, got ${typeof obj.client_listener_resource_name_template}`);
|
||||
}
|
||||
if (!obj.client_listener_resource_name_template.startsWith(`xdstp://${authorityName}/`)) {
|
||||
throw new Error(`authorities[${authorityName}].client_listener_resource_name_template must start with "xdstp://${authorityName}/"`);
|
||||
}
|
||||
}
|
||||
return {
|
||||
xdsServers: obj.xds_servers.map(validateXdsServerConfig),
|
||||
node: validateNode(obj.node),
|
||||
clientListenerResourceNameTemplate: obj.client_listener_resource_name_template ?? `xdstp://${authorityName}/envoy.config.listener.v3.Listener/%s`,
|
||||
xdsServers: obj.xds_servers?.map(validateXdsServerConfig)
|
||||
};
|
||||
}
|
||||
|
||||
let loadedBootstrapInfo: Promise<BootstrapInfo> | null = null;
|
||||
function validateAuthoritiesMap(obj: any): {[authorityName: string]: Authority} {
|
||||
if (!obj) {
|
||||
return {};
|
||||
}
|
||||
const result: {[authorityName: string]: Authority} = {};
|
||||
for (const [name, authority] of Object.entries(obj)) {
|
||||
result[name] = validateAuthority(authority, name);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
|
||||
export function validateBootstrapConfig(obj: any): BootstrapInfo {
|
||||
const xdsServers = obj.xds_servers.map(validateXdsServerConfig);
|
||||
const node = validateNode(obj.node);
|
||||
if (EXPERIMENTAL_FEDERATION) {
|
||||
if ('client_default_listener_resource_name_template' in obj) {
|
||||
if (typeof obj.client_default_listener_resource_name_template !== 'string') {
|
||||
throw new Error(`client_default_listener_resource_name_template: expected string, got ${typeof obj.client_default_listener_resource_name_template}`);
|
||||
}
|
||||
}
|
||||
return {
|
||||
xdsServers: xdsServers,
|
||||
node: node,
|
||||
authorities: validateAuthoritiesMap(obj.authorities),
|
||||
clientDefaultListenerResourceNameTemplate: obj.client_default_listener_resource_name_template ?? '%s'
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
xdsServers: xdsServers,
|
||||
node: node,
|
||||
authorities: {},
|
||||
clientDefaultListenerResourceNameTemplate: '%s'
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
let loadedBootstrapInfo: BootstrapInfo | null = null;
|
||||
|
||||
export function loadBootstrapInfo(): BootstrapInfo {
|
||||
if (loadedBootstrapInfo !== null) {
|
||||
return loadedBootstrapInfo;
|
||||
}
|
||||
|
@ -261,28 +306,19 @@ export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
|
|||
*/
|
||||
const bootstrapPath = process.env.GRPC_XDS_BOOTSTRAP;
|
||||
if (bootstrapPath) {
|
||||
loadedBootstrapInfo = new Promise((resolve, reject) => {
|
||||
fs.readFile(bootstrapPath, { encoding: 'utf8' }, (err, data) => {
|
||||
if (err) {
|
||||
reject(
|
||||
new Error(
|
||||
`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${err.message}`
|
||||
)
|
||||
);
|
||||
}
|
||||
try {
|
||||
const parsedFile = JSON.parse(data);
|
||||
resolve(validateBootstrapConfig(parsedFile));
|
||||
} catch (e) {
|
||||
reject(
|
||||
new Error(
|
||||
`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}`
|
||||
)
|
||||
);
|
||||
}
|
||||
});
|
||||
});
|
||||
return loadedBootstrapInfo;
|
||||
let rawBootstrap: string;
|
||||
try {
|
||||
rawBootstrap = fs.readFileSync(bootstrapPath, { encoding: 'utf8'});
|
||||
} catch (e) {
|
||||
throw new Error(`Failed to read xDS bootstrap file from path ${bootstrapPath} with error ${e.message}`);
|
||||
}
|
||||
try {
|
||||
const parsedFile = JSON.parse(rawBootstrap);
|
||||
loadedBootstrapInfo = validateBootstrapConfig(parsedFile);
|
||||
return loadedBootstrapInfo;
|
||||
} catch (e) {
|
||||
throw new Error(`Failed to parse xDS bootstrap file at path ${bootstrapPath} with error ${e.message}`)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -297,8 +333,7 @@ export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
|
|||
if (bootstrapConfig) {
|
||||
try {
|
||||
const parsedConfig = JSON.parse(bootstrapConfig);
|
||||
const loadedBootstrapInfoValue = validateBootstrapConfig(parsedConfig);
|
||||
loadedBootstrapInfo = Promise.resolve(loadedBootstrapInfoValue);
|
||||
loadedBootstrapInfo = validateBootstrapConfig(parsedConfig);
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
`Failed to parse xDS bootstrap config from environment variable GRPC_XDS_BOOTSTRAP_CONFIG with error ${e.message}`
|
||||
|
@ -308,9 +343,8 @@ export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
|
|||
return loadedBootstrapInfo;
|
||||
}
|
||||
|
||||
return Promise.reject(
|
||||
new Error(
|
||||
'The GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG environment variables need to be set to the path to the bootstrap file to use xDS'
|
||||
)
|
||||
|
||||
throw new Error(
|
||||
'The GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG environment variables need to be set to the path to the bootstrap file to use xDS'
|
||||
);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
|
|||
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel, connectivityState } from '@grpc/grpc-js';
|
||||
import * as adsTypes from './generated/ads';
|
||||
import * as lrsTypes from './generated/lrs';
|
||||
import { BootstrapInfo, loadBootstrapInfo } from './xds-bootstrap';
|
||||
import { BootstrapInfo, loadBootstrapInfo, XdsServerConfig } from './xds-bootstrap';
|
||||
import { Node } from './generated/envoy/config/core/v3/Node';
|
||||
import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService';
|
||||
import { DiscoveryRequest } from './generated/envoy/service/discovery/v3/DiscoveryRequest';
|
||||
|
@ -56,18 +56,14 @@ function trace(text: string): void {
|
|||
|
||||
const clientVersion = require('../../package.json').version;
|
||||
|
||||
let loadedProtos: Promise<
|
||||
adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
|
||||
> | null = null;
|
||||
let loadedProtos: adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType | null = null;
|
||||
|
||||
function loadAdsProtos(): Promise<
|
||||
adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
|
||||
> {
|
||||
function loadAdsProtos(): adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType {
|
||||
if (loadedProtos !== null) {
|
||||
return loadedProtos;
|
||||
}
|
||||
loadedProtos = protoLoader
|
||||
.load(
|
||||
return (loadPackageDefinition(protoLoader
|
||||
.loadSync(
|
||||
[
|
||||
'envoy/service/discovery/v3/ads.proto',
|
||||
'envoy/service/load_stats/v3/lrs.proto',
|
||||
|
@ -87,14 +83,7 @@ function loadAdsProtos(): Promise<
|
|||
__dirname + '/../../deps/protoc-gen-validate/',
|
||||
],
|
||||
}
|
||||
)
|
||||
.then(
|
||||
(packageDefinition) =>
|
||||
(loadPackageDefinition(
|
||||
packageDefinition
|
||||
) as unknown) as adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
|
||||
);
|
||||
return loadedProtos;
|
||||
)) as unknown) as adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType;
|
||||
}
|
||||
|
||||
function localityEqual(
|
||||
|
@ -247,9 +236,12 @@ function getResponseMessages<T extends AdsTypeUrl>(
|
|||
return result;
|
||||
}
|
||||
|
||||
export class XdsClient {
|
||||
class XdsSingleServerClient {
|
||||
|
||||
private adsNode: Node | null = null;
|
||||
private adsNode: Node;
|
||||
/* These client objects need to be nullable so that they can be shut down
|
||||
* when not in use. If the channel could enter the IDLE state it would remove
|
||||
* that need. */
|
||||
private adsClient: AggregatedDiscoveryServiceClient | null = null;
|
||||
private adsCall: ClientDuplexStream<
|
||||
DiscoveryRequest,
|
||||
|
@ -257,7 +249,7 @@ export class XdsClient {
|
|||
> | null = null;
|
||||
private receivedAdsResponseOnCurrentStream = false;
|
||||
|
||||
private lrsNode: Node | null = null;
|
||||
private lrsNode: Node;
|
||||
private lrsClient: LoadReportingServiceClient | null = null;
|
||||
private lrsCall: ClientDuplexStream<
|
||||
LoadStatsRequest,
|
||||
|
@ -269,14 +261,12 @@ export class XdsClient {
|
|||
private clusterStatsMap: ClusterLoadReportMap = new ClusterLoadReportMap();
|
||||
private statsTimer: NodeJS.Timer;
|
||||
|
||||
private hasShutdown = false;
|
||||
|
||||
private adsState: AdsState;
|
||||
|
||||
private adsBackoff: BackoffTimeout;
|
||||
private lrsBackoff: BackoffTimeout;
|
||||
|
||||
constructor(bootstrapInfoOverride?: BootstrapInfo) {
|
||||
constructor(bootstrapNode: Node, private xdsServerConfig: XdsServerConfig) {
|
||||
const edsState = new EdsState(() => {
|
||||
this.updateNames('eds');
|
||||
});
|
||||
|
@ -296,11 +286,6 @@ export class XdsClient {
|
|||
lds: ldsState,
|
||||
};
|
||||
|
||||
const channelArgs = {
|
||||
// 5 minutes
|
||||
'grpc.keepalive_time_ms': 5 * 60 * 1000
|
||||
}
|
||||
|
||||
this.adsBackoff = new BackoffTimeout(() => {
|
||||
this.maybeStartAdsStream();
|
||||
});
|
||||
|
@ -309,103 +294,32 @@ export class XdsClient {
|
|||
this.maybeStartLrsStream();
|
||||
});
|
||||
this.lrsBackoff.unref();
|
||||
|
||||
async function getBootstrapInfo(): Promise<BootstrapInfo> {
|
||||
if (bootstrapInfoOverride) {
|
||||
return bootstrapInfoOverride;
|
||||
} else {
|
||||
return loadBootstrapInfo();
|
||||
}
|
||||
}
|
||||
|
||||
Promise.all([getBootstrapInfo(), loadAdsProtos()]).then(
|
||||
([bootstrapInfo, protoDefinitions]) => {
|
||||
if (this.hasShutdown) {
|
||||
return;
|
||||
}
|
||||
trace('Loaded bootstrap info: ' + JSON.stringify(bootstrapInfo, undefined, 2));
|
||||
if (bootstrapInfo.xdsServers.length < 1) {
|
||||
trace('Failed to initialize xDS Client. No servers provided in bootstrap info.');
|
||||
// Bubble this error up to any listeners
|
||||
this.reportStreamError({
|
||||
code: status.INTERNAL,
|
||||
details: 'Failed to initialize xDS Client. No servers provided in bootstrap info.',
|
||||
metadata: new Metadata(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (bootstrapInfo.xdsServers[0].serverFeatures.indexOf('ignore_resource_deletion') >= 0) {
|
||||
this.adsState.lds.enableIgnoreResourceDeletion();
|
||||
this.adsState.cds.enableIgnoreResourceDeletion();
|
||||
}
|
||||
const userAgentName = 'gRPC Node Pure JS';
|
||||
this.adsNode = {
|
||||
...bootstrapInfo.node,
|
||||
user_agent_name: userAgentName,
|
||||
user_agent_version: clientVersion,
|
||||
client_features: ['envoy.lb.does_not_support_overprovisioning'],
|
||||
};
|
||||
this.lrsNode = {
|
||||
...bootstrapInfo.node,
|
||||
user_agent_name: userAgentName,
|
||||
user_agent_version: clientVersion,
|
||||
client_features: ['envoy.lrs.supports_send_all_clusters'],
|
||||
};
|
||||
setCsdsClientNode(this.adsNode);
|
||||
trace('ADS Node: ' + JSON.stringify(this.adsNode, undefined, 2));
|
||||
trace('LRS Node: ' + JSON.stringify(this.lrsNode, undefined, 2));
|
||||
const credentialsConfigs = bootstrapInfo.xdsServers[0].channelCreds;
|
||||
let channelCreds: ChannelCredentials | null = null;
|
||||
for (const config of credentialsConfigs) {
|
||||
if (config.type === 'google_default') {
|
||||
channelCreds = createGoogleDefaultCredentials();
|
||||
break;
|
||||
} else if (config.type === 'insecure') {
|
||||
channelCreds = ChannelCredentials.createInsecure();
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (channelCreds === null) {
|
||||
trace('Failed to initialize xDS Client. No valid credentials types found.');
|
||||
// Bubble this error up to any listeners
|
||||
this.reportStreamError({
|
||||
code: status.INTERNAL,
|
||||
details: 'Failed to initialize xDS Client. No valid credentials types found.',
|
||||
metadata: new Metadata(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
const serverUri = bootstrapInfo.xdsServers[0].serverUri
|
||||
trace('Starting xDS client connected to server URI ' + bootstrapInfo.xdsServers[0].serverUri);
|
||||
const channel = new Channel(serverUri, channelCreds, channelArgs);
|
||||
this.adsClient = new protoDefinitions.envoy.service.discovery.v3.AggregatedDiscoveryService(
|
||||
serverUri,
|
||||
channelCreds,
|
||||
{channelOverride: channel}
|
||||
);
|
||||
this.maybeStartAdsStream();
|
||||
channel.watchConnectivityState(channel.getConnectivityState(false), Infinity, () => {
|
||||
this.handleAdsConnectivityStateUpdate();
|
||||
})
|
||||
|
||||
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService(
|
||||
serverUri,
|
||||
channelCreds,
|
||||
{channelOverride: channel}
|
||||
);
|
||||
this.maybeStartLrsStream();
|
||||
}).catch((error) => {
|
||||
trace('Failed to initialize xDS Client. ' + error.message);
|
||||
// Bubble this error up to any listeners
|
||||
this.reportStreamError({
|
||||
code: status.INTERNAL,
|
||||
details: `Failed to initialize xDS Client. ${error.message}`,
|
||||
metadata: new Metadata(),
|
||||
});
|
||||
}
|
||||
);
|
||||
this.statsTimer = setInterval(() => {}, 0);
|
||||
clearInterval(this.statsTimer);
|
||||
if (xdsServerConfig.serverFeatures.indexOf('ignore_resource_deletion') >= 0) {
|
||||
this.adsState.lds.enableIgnoreResourceDeletion();
|
||||
this.adsState.cds.enableIgnoreResourceDeletion();
|
||||
}
|
||||
const userAgentName = 'gRPC Node Pure JS';
|
||||
this.adsNode = {
|
||||
...bootstrapNode,
|
||||
user_agent_name: userAgentName,
|
||||
user_agent_version: clientVersion,
|
||||
client_features: ['envoy.lb.does_not_support_overprovisioning'],
|
||||
};
|
||||
this.lrsNode = {
|
||||
...bootstrapNode,
|
||||
user_agent_name: userAgentName,
|
||||
user_agent_version: clientVersion,
|
||||
client_features: ['envoy.lrs.supports_send_all_clusters'],
|
||||
};
|
||||
setCsdsClientNode(this.adsNode);
|
||||
this.trace('ADS Node: ' + JSON.stringify(this.adsNode, undefined, 2));
|
||||
this.trace('LRS Node: ' + JSON.stringify(this.lrsNode, undefined, 2));
|
||||
}
|
||||
|
||||
private trace(text: string) {
|
||||
trace(this.xdsServerConfig.serverUri + ' ' + text);
|
||||
}
|
||||
|
||||
private handleAdsConnectivityStateUpdate() {
|
||||
|
@ -471,24 +385,24 @@ export class XdsClient {
|
|||
break;
|
||||
}
|
||||
} catch (e) {
|
||||
trace('Nacking message with protobuf parsing error: ' + e.message);
|
||||
this.trace('Nacking message with protobuf parsing error: ' + e.message);
|
||||
this.nack(message.type_url, e.message);
|
||||
return;
|
||||
}
|
||||
if (handleResponseResult === null) {
|
||||
// Null handleResponseResult means that the type_url was unrecognized
|
||||
trace('Nacking message with unknown type URL ' + message.type_url);
|
||||
this.trace('Nacking message with unknown type URL ' + message.type_url);
|
||||
this.nack(message.type_url, `Unknown type_url ${message.type_url}`);
|
||||
} else {
|
||||
updateCsdsResourceResponse(message.type_url as AdsTypeUrl, message.version_info, handleResponseResult.result);
|
||||
if (handleResponseResult.result.rejected.length > 0) {
|
||||
// rejected.length > 0 means that at least one message validation failed
|
||||
const errorString = `${handleResponseResult.serviceKind.toUpperCase()} Error: ${handleResponseResult.result.rejected[0].error}`;
|
||||
trace('Nacking message with type URL ' + message.type_url + ': ' + errorString);
|
||||
this.trace('Nacking message with type URL ' + message.type_url + ': ' + errorString);
|
||||
this.nack(message.type_url, errorString);
|
||||
} else {
|
||||
// If we get here, all message validation succeeded
|
||||
trace('Acking message with type URL ' + message.type_url);
|
||||
this.trace('Acking message with type URL ' + message.type_url);
|
||||
const serviceKind = handleResponseResult.serviceKind;
|
||||
this.adsState[serviceKind].nonce = message.nonce;
|
||||
this.adsState[serviceKind].versionInfo = message.version_info;
|
||||
|
@ -497,8 +411,60 @@ export class XdsClient {
|
|||
}
|
||||
}
|
||||
|
||||
private maybeCreateClients() {
|
||||
if (this.adsClient !== null && this.lrsClient !== null) {
|
||||
return;
|
||||
}
|
||||
const channelArgs = {
|
||||
// 5 minutes
|
||||
'grpc.keepalive_time_ms': 5 * 60 * 1000
|
||||
}
|
||||
const credentialsConfigs = this.xdsServerConfig.channelCreds;
|
||||
let channelCreds: ChannelCredentials | null = null;
|
||||
for (const config of credentialsConfigs) {
|
||||
if (config.type === 'google_default') {
|
||||
channelCreds = createGoogleDefaultCredentials();
|
||||
break;
|
||||
} else if (config.type === 'insecure') {
|
||||
channelCreds = ChannelCredentials.createInsecure();
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (channelCreds === null) {
|
||||
this.trace('Failed to initialize xDS Client. No valid credentials types found.');
|
||||
// Bubble this error up to any listeners
|
||||
this.reportStreamError({
|
||||
code: status.INTERNAL,
|
||||
details: 'Failed to initialize xDS Client. No valid credentials types found.',
|
||||
metadata: new Metadata(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
const serverUri = this.xdsServerConfig.serverUri
|
||||
this.trace('Starting xDS client connected to server URI ' + this.xdsServerConfig.serverUri);
|
||||
const channel = new Channel(serverUri, channelCreds, channelArgs);
|
||||
const protoDefinitions = loadAdsProtos();
|
||||
if (this.adsClient === null) {
|
||||
this.adsClient = new protoDefinitions.envoy.service.discovery.v3.AggregatedDiscoveryService(
|
||||
serverUri,
|
||||
channelCreds,
|
||||
{channelOverride: channel}
|
||||
);
|
||||
channel.watchConnectivityState(channel.getConnectivityState(false), Infinity, () => {
|
||||
this.handleAdsConnectivityStateUpdate();
|
||||
});
|
||||
}
|
||||
if (this.lrsClient === null) {
|
||||
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService(
|
||||
serverUri,
|
||||
channelCreds,
|
||||
{channelOverride: channel}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private handleAdsCallStatus(streamStatus: StatusObject) {
|
||||
trace(
|
||||
this.trace(
|
||||
'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
|
||||
);
|
||||
this.adsCall = null;
|
||||
|
@ -512,29 +478,28 @@ export class XdsClient {
|
|||
}
|
||||
}
|
||||
|
||||
private hasOutstandingResourceRequests() {
|
||||
return (this.adsState.eds.getResourceNames().length > 0 ||
|
||||
this.adsState.cds.getResourceNames().length > 0 ||
|
||||
this.adsState.rds.getResourceNames().length > 0 ||
|
||||
this.adsState.lds.getResourceNames().length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the ADS stream if the client exists and there is not already an
|
||||
* existing stream, and there are resources to request.
|
||||
*/
|
||||
private maybeStartAdsStream() {
|
||||
if (this.hasShutdown) {
|
||||
return;
|
||||
}
|
||||
if (this.adsState.eds.getResourceNames().length === 0 &&
|
||||
this.adsState.cds.getResourceNames().length === 0 &&
|
||||
this.adsState.rds.getResourceNames().length === 0 &&
|
||||
this.adsState.lds.getResourceNames().length === 0) {
|
||||
return;
|
||||
}
|
||||
if (this.adsClient === null) {
|
||||
if (!this.hasOutstandingResourceRequests()) {
|
||||
return;
|
||||
}
|
||||
if (this.adsCall !== null) {
|
||||
return;
|
||||
}
|
||||
this.maybeCreateClients();
|
||||
this.receivedAdsResponseOnCurrentStream = false;
|
||||
const metadata = new Metadata({waitForReady: true});
|
||||
this.adsCall = this.adsClient.StreamAggregatedResources(metadata);
|
||||
this.adsCall = this.adsClient!.StreamAggregatedResources(metadata);
|
||||
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
|
||||
this.handleAdsResponse(message);
|
||||
});
|
||||
|
@ -542,7 +507,7 @@ export class XdsClient {
|
|||
this.handleAdsCallStatus(status);
|
||||
});
|
||||
this.adsCall.on('error', () => {});
|
||||
trace('Started ADS stream');
|
||||
this.trace('Started ADS stream');
|
||||
// Backoff relative to when we start the request
|
||||
this.adsBackoff.runOnce();
|
||||
|
||||
|
@ -553,7 +518,7 @@ export class XdsClient {
|
|||
this.updateNames(service);
|
||||
}
|
||||
}
|
||||
if (this.adsClient.getChannel().getConnectivityState(false) === connectivityState.READY) {
|
||||
if (this.adsClient!.getChannel().getConnectivityState(false) === connectivityState.READY) {
|
||||
this.reportAdsStreamStarted();
|
||||
}
|
||||
}
|
||||
|
@ -586,7 +551,7 @@ export class XdsClient {
|
|||
* Acknowledge an update. This should be called after the local nonce and
|
||||
* version info are updated so that it sends the post-update values.
|
||||
*/
|
||||
ack(serviceKind: AdsServiceKind) {
|
||||
private ack(serviceKind: AdsServiceKind) {
|
||||
this.updateNames(serviceKind);
|
||||
}
|
||||
|
||||
|
@ -633,15 +598,20 @@ export class XdsClient {
|
|||
this.maybeSendAdsMessage(typeUrl, resourceNames, nonce, versionInfo, message);
|
||||
}
|
||||
|
||||
private shutdown() {
|
||||
this.adsCall?.end();
|
||||
this.adsCall = null;
|
||||
this.lrsCall?.end();
|
||||
this.lrsCall = null;
|
||||
this.adsClient?.close();
|
||||
this.adsClient = null;
|
||||
this.lrsClient?.close();
|
||||
this.lrsClient = null;
|
||||
}
|
||||
|
||||
private updateNames(serviceKind: AdsServiceKind) {
|
||||
if (this.adsState.eds.getResourceNames().length === 0 &&
|
||||
this.adsState.cds.getResourceNames().length === 0 &&
|
||||
this.adsState.rds.getResourceNames().length === 0 &&
|
||||
this.adsState.lds.getResourceNames().length === 0) {
|
||||
this.adsCall?.end();
|
||||
this.adsCall = null;
|
||||
this.lrsCall?.end();
|
||||
this.lrsCall = null;
|
||||
if (!this.hasOutstandingResourceRequests()) {
|
||||
this.shutdown();
|
||||
return;
|
||||
}
|
||||
this.maybeStartAdsStream();
|
||||
|
@ -653,7 +623,7 @@ export class XdsClient {
|
|||
* of getTypeUrl is garbage and everything after that is invalid. */
|
||||
return;
|
||||
}
|
||||
trace('Sending update for ' + serviceKind + ' with names ' + this.adsState[serviceKind].getResourceNames());
|
||||
this.trace('Sending update for ' + serviceKind + ' with names ' + this.adsState[serviceKind].getResourceNames());
|
||||
const typeUrl = this.getTypeUrl(serviceKind);
|
||||
updateCsdsRequestedNameList(typeUrl, this.adsState[serviceKind].getResourceNames());
|
||||
this.maybeSendAdsMessage(typeUrl, this.adsState[serviceKind].getResourceNames(), this.adsState[serviceKind].nonce, this.adsState[serviceKind].versionInfo);
|
||||
|
@ -675,7 +645,7 @@ export class XdsClient {
|
|||
}
|
||||
|
||||
private handleLrsResponse(message: LoadStatsResponse__Output) {
|
||||
trace('Received LRS response');
|
||||
this.trace('Received LRS response');
|
||||
/* Once we get any response from the server, we assume that the stream is
|
||||
* in a good state, so we can reset the backoff timer. */
|
||||
this.lrsBackoff.reset();
|
||||
|
@ -694,7 +664,7 @@ export class XdsClient {
|
|||
const loadReportingIntervalMs =
|
||||
Number.parseInt(message.load_reporting_interval!.seconds) * 1000 +
|
||||
message.load_reporting_interval!.nanos / 1_000_000;
|
||||
trace('Received LRS response with load reporting interval ' + loadReportingIntervalMs + ' ms');
|
||||
this.trace('Received LRS response with load reporting interval ' + loadReportingIntervalMs + ' ms');
|
||||
this.statsTimer = setInterval(() => {
|
||||
this.sendStats();
|
||||
}, loadReportingIntervalMs);
|
||||
|
@ -704,7 +674,7 @@ export class XdsClient {
|
|||
}
|
||||
|
||||
private handleLrsCallStatus(streamStatus: StatusObject) {
|
||||
trace(
|
||||
this.trace(
|
||||
'LRS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
|
||||
);
|
||||
this.lrsCall = null;
|
||||
|
@ -716,42 +686,15 @@ export class XdsClient {
|
|||
}
|
||||
}
|
||||
|
||||
private maybeStartLrsStreamV3(): boolean {
|
||||
if (!this.lrsClient) {
|
||||
return false;
|
||||
}
|
||||
if (this.lrsCall) {
|
||||
return false;
|
||||
}
|
||||
this.lrsCall = this.lrsClient.streamLoadStats();
|
||||
this.receivedLrsSettingsForCurrentStream = false;
|
||||
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
|
||||
this.handleLrsResponse(message);
|
||||
});
|
||||
this.lrsCall.on('status', (status: StatusObject) => {
|
||||
this.handleLrsCallStatus(status);
|
||||
});
|
||||
this.lrsCall.on('error', () => {});
|
||||
return true;
|
||||
}
|
||||
|
||||
private maybeStartLrsStream() {
|
||||
if (this.hasShutdown) {
|
||||
return;
|
||||
}
|
||||
if (this.adsState.eds.getResourceNames().length === 0 &&
|
||||
this.adsState.cds.getResourceNames().length === 0 &&
|
||||
this.adsState.rds.getResourceNames().length === 0 &&
|
||||
this.adsState.lds.getResourceNames().length === 0) {
|
||||
return;
|
||||
}
|
||||
if (!this.lrsClient) {
|
||||
if (!this.hasOutstandingResourceRequests()) {
|
||||
return;
|
||||
}
|
||||
if (this.lrsCall) {
|
||||
return;
|
||||
}
|
||||
this.lrsCall = this.lrsClient.streamLoadStats();
|
||||
this.maybeCreateClients();
|
||||
this.lrsCall = this.lrsClient!.streamLoadStats();
|
||||
this.receivedLrsSettingsForCurrentStream = false;
|
||||
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
|
||||
this.handleLrsResponse(message);
|
||||
|
@ -760,7 +703,7 @@ export class XdsClient {
|
|||
this.handleLrsCallStatus(status);
|
||||
});
|
||||
this.lrsCall.on('error', () => {});
|
||||
trace('Starting LRS stream');
|
||||
this.trace('Starting LRS stream');
|
||||
this.lrsBackoff.runOnce();
|
||||
/* Send buffered stats information when starting LRS stream. If there is no
|
||||
* buffered stats information, it will still send the node field. */
|
||||
|
@ -844,7 +787,7 @@ export class XdsClient {
|
|||
}
|
||||
}
|
||||
}
|
||||
trace('Sending LRS stats ' + JSON.stringify(clusterStats, undefined, 2));
|
||||
this.trace('Sending LRS stats ' + JSON.stringify(clusterStats, undefined, 2));
|
||||
this.maybeSendLrsMessage(clusterStats);
|
||||
}
|
||||
|
||||
|
@ -852,7 +795,7 @@ export class XdsClient {
|
|||
edsServiceName: string,
|
||||
watcher: Watcher<ClusterLoadAssignment__Output>
|
||||
) {
|
||||
trace('Watcher added for endpoint ' + edsServiceName);
|
||||
this.trace('Watcher added for endpoint ' + edsServiceName);
|
||||
this.adsState.eds.addWatcher(edsServiceName, watcher);
|
||||
}
|
||||
|
||||
|
@ -860,37 +803,37 @@ export class XdsClient {
|
|||
edsServiceName: string,
|
||||
watcher: Watcher<ClusterLoadAssignment__Output>
|
||||
) {
|
||||
trace('Watcher removed for endpoint ' + edsServiceName);
|
||||
this.trace('Watcher removed for endpoint ' + edsServiceName);
|
||||
this.adsState.eds.removeWatcher(edsServiceName, watcher);
|
||||
}
|
||||
|
||||
addClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
|
||||
trace('Watcher added for cluster ' + clusterName);
|
||||
this.trace('Watcher added for cluster ' + clusterName);
|
||||
this.adsState.cds.addWatcher(clusterName, watcher);
|
||||
}
|
||||
|
||||
removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
|
||||
trace('Watcher removed for cluster ' + clusterName);
|
||||
this.trace('Watcher removed for cluster ' + clusterName);
|
||||
this.adsState.cds.removeWatcher(clusterName, watcher);
|
||||
}
|
||||
|
||||
addRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
|
||||
trace('Watcher added for route ' + routeConfigName);
|
||||
this.trace('Watcher added for route ' + routeConfigName);
|
||||
this.adsState.rds.addWatcher(routeConfigName, watcher);
|
||||
}
|
||||
|
||||
removeRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
|
||||
trace('Watcher removed for route ' + routeConfigName);
|
||||
this.trace('Watcher removed for route ' + routeConfigName);
|
||||
this.adsState.rds.removeWatcher(routeConfigName, watcher);
|
||||
}
|
||||
|
||||
addListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
|
||||
trace('Watcher added for listener ' + targetName);
|
||||
this.trace('Watcher added for listener ' + targetName);
|
||||
this.adsState.lds.addWatcher(targetName, watcher);
|
||||
}
|
||||
|
||||
removeListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
|
||||
trace('Watcher removed for listener ' + targetName);
|
||||
this.trace('Watcher removed for listener ' + targetName);
|
||||
this.adsState.lds.removeWatcher(targetName, watcher);
|
||||
}
|
||||
|
||||
|
@ -903,17 +846,10 @@ export class XdsClient {
|
|||
* @param edsServiceName
|
||||
*/
|
||||
addClusterDropStats(
|
||||
lrsServer: string,
|
||||
clusterName: string,
|
||||
edsServiceName: string
|
||||
): XdsClusterDropStats {
|
||||
trace('addClusterDropStats(lrsServer=' + lrsServer + ', clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')');
|
||||
if (lrsServer !== '') {
|
||||
return {
|
||||
addUncategorizedCallDropped: () => {},
|
||||
addCallDropped: (category) => {},
|
||||
};
|
||||
}
|
||||
this.trace('addClusterDropStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')');
|
||||
const clusterStats = this.clusterStatsMap.getOrCreate(
|
||||
clusterName,
|
||||
edsServiceName
|
||||
|
@ -930,18 +866,11 @@ export class XdsClient {
|
|||
}
|
||||
|
||||
addClusterLocalityStats(
|
||||
lrsServer: string,
|
||||
clusterName: string,
|
||||
edsServiceName: string,
|
||||
locality: Locality__Output
|
||||
): XdsClusterLocalityStats {
|
||||
trace('addClusterLocalityStats(lrsServer=' + lrsServer + ', clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ', locality=' + JSON.stringify(locality) + ')');
|
||||
if (lrsServer !== '') {
|
||||
return {
|
||||
addCallStarted: () => {},
|
||||
addCallFinished: (fail) => {},
|
||||
};
|
||||
}
|
||||
this.trace('addClusterLocalityStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ', locality=' + JSON.stringify(locality) + ')');
|
||||
const clusterStats = this.clusterStatsMap.getOrCreate(
|
||||
clusterName,
|
||||
edsServiceName
|
||||
|
@ -981,13 +910,194 @@ export class XdsClient {
|
|||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private shutdown(): void {
|
||||
this.adsCall?.cancel();
|
||||
this.adsClient?.close();
|
||||
this.lrsCall?.cancel();
|
||||
this.lrsClient?.close();
|
||||
this.hasShutdown = true;
|
||||
const KNOWN_SERVER_FEATURES = ['ignore_resource_deletion'];
|
||||
|
||||
function serverConfigEqual(config1: XdsServerConfig, config2: XdsServerConfig): boolean {
|
||||
if (config1.serverUri !== config2.serverUri) {
|
||||
return false;
|
||||
}
|
||||
for (const feature of KNOWN_SERVER_FEATURES) {
|
||||
if ((feature in config1.serverFeatures) !== (feature in config2.serverFeatures)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (config1.channelCreds.length !== config2.channelCreds.length) {
|
||||
return false;
|
||||
}
|
||||
for (const [index, creds1] of config1.channelCreds.entries()) {
|
||||
const creds2 = config2.channelCreds[index];
|
||||
if (creds1.type !== creds2.type) {
|
||||
return false;
|
||||
}
|
||||
if (JSON.stringify(creds1) !== JSON.stringify(creds2)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
interface ClientMapEntry {
|
||||
serverConfig: XdsServerConfig;
|
||||
client: XdsSingleServerClient;
|
||||
}
|
||||
|
||||
export class XdsClient {
|
||||
private clientMap: ClientMapEntry[] = [];
|
||||
|
||||
constructor(private bootstrapInfoOverride?: BootstrapInfo) {}
|
||||
|
||||
private getBootstrapInfo() {
|
||||
if (this.bootstrapInfoOverride) {
|
||||
return this.bootstrapInfoOverride;
|
||||
} else {
|
||||
return loadBootstrapInfo();
|
||||
}
|
||||
}
|
||||
|
||||
private getClient(serverConfig: XdsServerConfig): XdsSingleServerClient | null {
|
||||
for (const entry of this.clientMap) {
|
||||
if (serverConfigEqual(serverConfig, entry.serverConfig)) {
|
||||
return entry.client;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private getOrCreateClientForResource(resourceName: string): XdsSingleServerClient {
|
||||
const bootstrapInfo = this.getBootstrapInfo();
|
||||
let serverConfig: XdsServerConfig;
|
||||
if (resourceName.startsWith('xdstp:')) {
|
||||
const match = resourceName.match(/xdstp:\/\/([^/]+)\//);
|
||||
if (!match) {
|
||||
throw new Error(`Parse error: Resource ${resourceName} has no authority`);
|
||||
}
|
||||
const authority = match[1];
|
||||
if (authority in bootstrapInfo.authorities) {
|
||||
serverConfig = bootstrapInfo.authorities[authority].xdsServers?.[0] ?? bootstrapInfo.xdsServers[0];
|
||||
} else {
|
||||
throw new Error(`Authority ${authority} in resource ${resourceName} not found in authorities list`);
|
||||
}
|
||||
} else {
|
||||
serverConfig = bootstrapInfo.xdsServers[0];
|
||||
}
|
||||
for (const entry of this.clientMap) {
|
||||
if (serverConfigEqual(serverConfig, entry.serverConfig)) {
|
||||
return entry.client;
|
||||
}
|
||||
}
|
||||
const newClient = new XdsSingleServerClient(bootstrapInfo.node, serverConfig);
|
||||
this.clientMap.push({serverConfig: serverConfig, client: newClient});
|
||||
return newClient;
|
||||
}
|
||||
|
||||
addEndpointWatcher(
|
||||
edsServiceName: string,
|
||||
watcher: Watcher<ClusterLoadAssignment__Output>
|
||||
) {
|
||||
trace('addEndpointWatcher(' + edsServiceName + ')');
|
||||
try {
|
||||
const client = this.getOrCreateClientForResource(edsServiceName);
|
||||
client.addEndpointWatcher(edsServiceName, watcher);
|
||||
} catch (e) {
|
||||
trace('addEndpointWatcher error: ' + e.message);
|
||||
watcher.onTransientError({code: status.UNAVAILABLE, details: e.message, metadata: new Metadata()});
|
||||
}
|
||||
}
|
||||
|
||||
removeEndpointWatcher(
|
||||
edsServiceName: string,
|
||||
watcher: Watcher<ClusterLoadAssignment__Output>
|
||||
) {
|
||||
trace('removeEndpointWatcher(' + edsServiceName + ')');
|
||||
try {
|
||||
const client = this.getOrCreateClientForResource(edsServiceName);
|
||||
client.removeEndpointWatcher(edsServiceName, watcher);
|
||||
} catch (e) {
|
||||
}
|
||||
}
|
||||
|
||||
addClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
|
||||
trace('addClusterWatcher(' + clusterName + ')');
|
||||
try {
|
||||
const client = this.getOrCreateClientForResource(clusterName);
|
||||
client.addClusterWatcher(clusterName, watcher);
|
||||
} catch (e) {
|
||||
trace('addClusterWatcher error: ' + e.message);
|
||||
watcher.onTransientError({code: status.UNAVAILABLE, details: e.message, metadata: new Metadata()});
|
||||
}
|
||||
}
|
||||
|
||||
removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
|
||||
trace('removeClusterWatcher(' + clusterName + ')');
|
||||
try {
|
||||
const client = this.getOrCreateClientForResource(clusterName);
|
||||
client.removeClusterWatcher(clusterName, watcher);
|
||||
} catch (e) {
|
||||
}
|
||||
}
|
||||
|
||||
addRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
|
||||
trace('addRouteWatcher(' + routeConfigName + ')');
|
||||
try {
|
||||
const client = this.getOrCreateClientForResource(routeConfigName);
|
||||
client.addRouteWatcher(routeConfigName, watcher);
|
||||
} catch (e) {
|
||||
trace('addRouteWatcher error: ' + e.message);
|
||||
watcher.onTransientError({code: status.UNAVAILABLE, details: e.message, metadata: new Metadata()});
|
||||
}
|
||||
}
|
||||
|
||||
removeRouteWatcher(routeConfigName: string, watcher: Watcher<RouteConfiguration__Output>) {
|
||||
trace('removeRouteWatcher(' + routeConfigName + ')');
|
||||
try {
|
||||
const client = this.getOrCreateClientForResource(routeConfigName);
|
||||
client.removeRouteWatcher(routeConfigName, watcher);
|
||||
} catch (e) {
|
||||
}
|
||||
}
|
||||
|
||||
addListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
|
||||
trace('addListenerWatcher(' + targetName + ')');
|
||||
try {
|
||||
const client = this.getOrCreateClientForResource(targetName);
|
||||
client.addListenerWatcher(targetName, watcher);
|
||||
} catch (e) {
|
||||
trace('addListenerWatcher error: ' + e.message);
|
||||
watcher.onTransientError({code: status.UNAVAILABLE, details: e.message, metadata: new Metadata()});
|
||||
}
|
||||
}
|
||||
|
||||
removeListenerWatcher(targetName: string, watcher: Watcher<Listener__Output>) {
|
||||
trace('removeListenerWatcher' + targetName);
|
||||
try {
|
||||
const client = this.getOrCreateClientForResource(targetName);
|
||||
client.removeListenerWatcher(targetName, watcher);
|
||||
} catch (e) {
|
||||
}
|
||||
}
|
||||
|
||||
addClusterDropStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string): XdsClusterDropStats {
|
||||
const client = this.getClient(lrsServer);
|
||||
if (!client) {
|
||||
return {
|
||||
addUncategorizedCallDropped: () => {},
|
||||
addCallDropped: (category) => {},
|
||||
};
|
||||
}
|
||||
return client.addClusterDropStats(clusterName, edsServiceName);
|
||||
}
|
||||
|
||||
addClusterLocalityStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string, locality: Locality__Output): XdsClusterLocalityStats {
|
||||
const client = this.getClient(lrsServer);
|
||||
if (!client) {
|
||||
return {
|
||||
addCallStarted: () => {},
|
||||
addCallFinished: (fail) => {},
|
||||
};
|
||||
}
|
||||
return client.addClusterLocalityStats(clusterName, edsServiceName, locality);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Copyright 2023 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
import { BootstrapInfo, Node, validateBootstrapConfig } from "../src/xds-bootstrap";
|
||||
import { experimental } from "@grpc/grpc-js";
|
||||
import * as assert from 'assert';
|
||||
import GrpcUri = experimental.GrpcUri;
|
||||
import { getListenerResourceName } from "../src/resolver-xds";
|
||||
|
||||
const testNode: Node = {
|
||||
id: 'test',
|
||||
locality: {}
|
||||
};
|
||||
|
||||
describe('Listener resource name evaluation', () => {
|
||||
describe('No new bootstrap fields', () => {
|
||||
const bootstrap = validateBootstrapConfig({
|
||||
node: testNode,
|
||||
xds_servers: []
|
||||
});
|
||||
it('xds:server.example.com', () => {
|
||||
const target: GrpcUri = {
|
||||
scheme: 'xds',
|
||||
path: 'server.example.com'
|
||||
};
|
||||
assert.strictEqual(getListenerResourceName(bootstrap, target), 'server.example.com');
|
||||
});
|
||||
it('xds://xds.authority.com/server.example.com', () => {
|
||||
const target: GrpcUri = {
|
||||
scheme: 'xds',
|
||||
authority: 'xds.authority.com',
|
||||
path: 'server.example.com'
|
||||
};
|
||||
assert.throws(() => getListenerResourceName(bootstrap, target), /xds.authority.com/);
|
||||
});
|
||||
});
|
||||
describe('New-style names', () => {
|
||||
const bootstrap = validateBootstrapConfig({
|
||||
node: testNode,
|
||||
xds_servers: [],
|
||||
client_default_listener_resource_name_template: 'xdstp://xds.authority.com/envoy.config.listener.v3.Listener/%s',
|
||||
authorities: {
|
||||
'xds.authority.com': {}
|
||||
}
|
||||
});
|
||||
it('xds:server.example.com', () => {
|
||||
const target: GrpcUri = {
|
||||
scheme: 'xds',
|
||||
path: 'server.example.com'
|
||||
};
|
||||
assert.strictEqual(getListenerResourceName(bootstrap, target), 'xdstp://xds.authority.com/envoy.config.listener.v3.Listener/server.example.com');
|
||||
});
|
||||
it('xds://xds.authority.com/server.example.com', () => {
|
||||
const target: GrpcUri = {
|
||||
scheme: 'xds',
|
||||
authority: 'xds.authority.com',
|
||||
path: 'server.example.com'
|
||||
};
|
||||
assert.strictEqual(getListenerResourceName(bootstrap, target), 'xdstp://xds.authority.com/envoy.config.listener.v3.Listener/server.example.com');
|
||||
});
|
||||
});
|
||||
describe('Multiple authorities', () => {
|
||||
const bootstrap = validateBootstrapConfig({
|
||||
node: testNode,
|
||||
xds_servers: [{
|
||||
"server_uri": "xds-server.authority.com",
|
||||
"channel_creds": [ { "type": "google_default" } ]
|
||||
}],
|
||||
client_default_listener_resource_name_template: 'xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/client/%s?project_id=1234',
|
||||
authorities: {
|
||||
"xds.authority.com": {
|
||||
"client_listener_resource_name_template": "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/client/%s?project_id=1234"
|
||||
},
|
||||
|
||||
"xds.other.com": {
|
||||
"xds_servers": [
|
||||
{
|
||||
"server_uri": "xds-server.other.com",
|
||||
"channel_creds": [ { "type": "google_default" } ]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
});
|
||||
it('xds:server.example.com', () => {
|
||||
const target: GrpcUri = {
|
||||
scheme: 'xds',
|
||||
path: 'server.example.com'
|
||||
};
|
||||
assert.strictEqual(getListenerResourceName(bootstrap, target), 'xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/client/server.example.com?project_id=1234');
|
||||
});
|
||||
it('xds://xds.authority.com/server.example.com', () => {
|
||||
const target: GrpcUri = {
|
||||
scheme: 'xds',
|
||||
authority: 'xds.authority.com',
|
||||
path: 'server.example.com'
|
||||
};
|
||||
assert.strictEqual(getListenerResourceName(bootstrap, target), 'xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/client/server.example.com?project_id=1234');
|
||||
});
|
||||
it('xds://xds.other.com/server.other.com', () => {
|
||||
const target: GrpcUri = {
|
||||
scheme: 'xds',
|
||||
authority: 'xds.other.com',
|
||||
path: 'server.other.com'
|
||||
};
|
||||
assert.strictEqual(getListenerResourceName(bootstrap, target), 'xdstp://xds.other.com/envoy.config.listener.v3.Listener/server.other.com');
|
||||
});
|
||||
});
|
||||
});
|
Loading…
Reference in New Issue