Implement federation support (continued)

This commit is contained in:
Michael Lumish 2023-04-28 10:07:12 -07:00
parent 2da3f868f2
commit 9d1b8493a2
20 changed files with 1665 additions and 251 deletions

View File

@ -16,7 +16,7 @@
*/
import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
import { getSingletonXdsClient, XdsSingleServerClient } from './xds-client';
import { getSingletonXdsClient, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
import SubchannelAddress = experimental.SubchannelAddress;
import UnavailablePicker = experimental.UnavailablePicker;
@ -35,6 +35,7 @@ import { Duration__Output } from './generated/google/protobuf/Duration';
import { EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler, XdsClusterResolverLoadBalancingConfig } from './load-balancer-xds-cluster-resolver';
import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from './resources';
import { CdsUpdate, OutlierDetectionUpdate } from './xds-stream-state/cds-state';
const TRACER_NAME = 'cds_balancer';
@ -76,7 +77,7 @@ function durationToMs(duration: Duration__Output): number {
return (Number(duration.seconds) * 1_000 + duration.nanos / 1_000_000) | 0;
}
function translateOutlierDetectionConfig(outlierDetection: OutlierDetection__Output | null): OutlierDetectionLoadBalancingConfig | undefined {
function translateOutlierDetectionConfig(outlierDetection: OutlierDetectionUpdate | undefined): OutlierDetectionLoadBalancingConfig | undefined {
if (!EXPERIMENTAL_OUTLIER_DETECTION) {
return undefined;
}
@ -84,42 +85,20 @@ function translateOutlierDetectionConfig(outlierDetection: OutlierDetection__Out
/* No-op outlier detection config, with all fields unset. */
return new OutlierDetectionLoadBalancingConfig(null, null, null, null, null, null, []);
}
let successRateConfig: Partial<SuccessRateEjectionConfig> | null = null;
/* Success rate ejection is enabled by default, so we only disable it if
* enforcing_success_rate is set and it has the value 0 */
if (!outlierDetection.enforcing_success_rate || outlierDetection.enforcing_success_rate.value > 0) {
successRateConfig = {
enforcement_percentage: outlierDetection.enforcing_success_rate?.value,
minimum_hosts: outlierDetection.success_rate_minimum_hosts?.value,
request_volume: outlierDetection.success_rate_request_volume?.value,
stdev_factor: outlierDetection.success_rate_stdev_factor?.value
};
}
let failurePercentageConfig: Partial<FailurePercentageEjectionConfig> | null = null;
/* Failure percentage ejection is disabled by default, so we only enable it
* if enforcing_failure_percentage is set and it has a value greater than 0 */
if (outlierDetection.enforcing_failure_percentage && outlierDetection.enforcing_failure_percentage.value > 0) {
failurePercentageConfig = {
enforcement_percentage: outlierDetection.enforcing_failure_percentage.value,
minimum_hosts: outlierDetection.failure_percentage_minimum_hosts?.value,
request_volume: outlierDetection.failure_percentage_request_volume?.value,
threshold: outlierDetection.failure_percentage_threshold?.value
}
}
return new OutlierDetectionLoadBalancingConfig(
outlierDetection.interval ? durationToMs(outlierDetection.interval) : null,
outlierDetection.base_ejection_time ? durationToMs(outlierDetection.base_ejection_time) : null,
outlierDetection.max_ejection_time ? durationToMs(outlierDetection.max_ejection_time) : null,
outlierDetection.max_ejection_percent?.value ?? null,
successRateConfig,
failurePercentageConfig,
outlierDetection.intervalMs,
outlierDetection.baseEjectionTimeMs,
outlierDetection.maxEjectionTimeMs,
outlierDetection.maxEjectionPercent,
outlierDetection.successRateConfig,
outlierDetection.failurePercentageConfig,
[]
);
}
interface ClusterEntry {
watcher: Watcher<Cluster__Output>;
latestUpdate?: Cluster__Output;
watcher: Watcher<CdsUpdate>;
latestUpdate?: CdsUpdate;
children: string[];
}
@ -144,34 +123,19 @@ function isClusterTreeFullyUpdated(tree: ClusterTree, root: string): boolean {
return true;
}
function generateDiscoveryMechanismForCluster(config: Cluster__Output): DiscoveryMechanism {
let maxConcurrentRequests: number | undefined = undefined;
for (const threshold of config.circuit_breakers?.thresholds ?? []) {
if (threshold.priority === 'DEFAULT') {
maxConcurrentRequests = threshold.max_requests?.value;
}
}
if (config.type === 'EDS') {
// EDS cluster
return {
cluster: config.name,
lrs_load_reporting_server_name: config.lrs_server?.self ? '' : undefined,
max_concurrent_requests: maxConcurrentRequests,
type: 'EDS',
eds_service_name: config.eds_cluster_config!.service_name === '' ? undefined : config.eds_cluster_config!.service_name,
outlier_detection: translateOutlierDetectionConfig(config.outlier_detection)
};
} else {
// Logical DNS cluster
const socketAddress = config.load_assignment!.endpoints[0].lb_endpoints[0].endpoint!.address!.socket_address!;
return {
cluster: config.name,
lrs_load_reporting_server_name: config.lrs_server?.self ? '' : undefined,
max_concurrent_requests: maxConcurrentRequests,
type: 'LOGICAL_DNS',
dns_hostname: `${socketAddress.address}:${socketAddress.port_value}`
};
function generateDiscoverymechanismForCdsUpdate(config: CdsUpdate): DiscoveryMechanism {
if (config.type === 'AGGREGATE') {
throw new Error('Cannot generate DiscoveryMechanism for AGGREGATE cluster');
}
return {
cluster: config.name,
lrs_load_reporting_server: config.lrsLoadReportingServer,
max_concurrent_requests: config.maxConcurrentRequests,
type: config.type,
eds_service_name: config.edsServiceName,
dns_hostname: config.dnsHostname,
outlier_detection: translateOutlierDetectionConfig(config.outlierDetectionUpdate)
};
}
const RECURSION_DEPTH_LIMIT = 15;
@ -203,7 +167,7 @@ function getDiscoveryMechanismList(tree: ClusterTree, root: string): DiscoveryMe
trace('Visit leaf ' + node);
// individual cluster
const config = tree[node].latestUpdate!;
return [generateDiscoveryMechanismForCluster(config)];
return [generateDiscoverymechanismForCdsUpdate(config)];
}
}
return getDiscoveryMechanismListHelper(root, 0);
@ -216,7 +180,7 @@ export class CdsLoadBalancer implements LoadBalancer {
private latestConfig: CdsLoadBalancingConfig | null = null;
private latestAttributes: { [key: string]: unknown } = {};
private xdsClient: XdsSingleServerClient | null = null;
private xdsClient: XdsClient | null = null;
private clusterTree: ClusterTree = {};
@ -231,11 +195,11 @@ export class CdsLoadBalancer implements LoadBalancer {
return;
}
trace('Adding watcher for cluster ' + cluster);
const watcher: Watcher<Cluster__Output> = {
const watcher: Watcher<CdsUpdate> = {
onValidUpdate: (update) => {
this.clusterTree[cluster].latestUpdate = update;
if (update.cluster_discovery_type === 'cluster_type') {
const children = decodeSingleResource(CLUSTER_CONFIG_TYPE_URL, update.cluster_type!.typed_config!.value).clusters;
if (update.type === 'AGGREGATE') {
const children = update.aggregateChildren
trace('Received update for aggregate cluster ' + cluster + ' with children [' + children + ']');
this.clusterTree[cluster].children = children;
children.forEach(child => this.addCluster(child));
@ -315,7 +279,7 @@ export class CdsLoadBalancer implements LoadBalancer {
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
this.latestAttributes = attributes;
this.xdsClient = attributes.xdsClient as XdsSingleServerClient;
this.xdsClient = attributes.xdsClient as XdsClient;
/* If the cluster is changing, disable the old watcher before adding the new
* one */

View File

@ -17,7 +17,8 @@
import { connectivityState as ConnectivityState, StatusObject, status as Status, experimental } from '@grpc/grpc-js';
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
import { XdsClusterLocalityStats, XdsSingleServerClient, getSingletonXdsClient } from './xds-client';
import { validateXdsServerConfig, XdsServerConfig } from './xds-bootstrap';
import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds-client';
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
@ -46,14 +47,14 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
[TYPE_NAME]: {
cluster_name: this.clusterName,
eds_service_name: this.edsServiceName,
lrs_load_reporting_server_name: this.lrsLoadReportingServerName,
lrs_load_reporting_server_name: this.lrsLoadReportingServer,
locality: this.locality,
child_policy: this.childPolicy.map(policy => policy.toJsonObject())
}
}
}
constructor(private clusterName: string, private edsServiceName: string, private lrsLoadReportingServerName: string, private locality: Locality__Output, private childPolicy: LoadBalancingConfig[]) {}
constructor(private clusterName: string, private edsServiceName: string, private lrsLoadReportingServer: XdsServerConfig, private locality: Locality__Output, private childPolicy: LoadBalancingConfig[]) {}
getClusterName() {
return this.clusterName;
@ -63,8 +64,8 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
return this.edsServiceName;
}
getLrsLoadReportingServerName() {
return this.lrsLoadReportingServerName;
getLrsLoadReportingServer() {
return this.lrsLoadReportingServer;
}
getLocality() {
@ -82,9 +83,6 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
if (!('eds_service_name' in obj && typeof obj.eds_service_name === 'string')) {
throw new Error('lrs config must have a string field eds_service_name');
}
if (!('lrs_load_reporting_server_name' in obj && typeof obj.lrs_load_reporting_server_name === 'string')) {
throw new Error('lrs config must have a string field lrs_load_reporting_server_name');
}
if (!('locality' in obj && obj.locality !== null && typeof obj.locality === 'object')) {
throw new Error('lrs config must have an object field locality');
}
@ -100,7 +98,7 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
if (!('child_policy' in obj && Array.isArray(obj.child_policy))) {
throw new Error('lrs config must have a child_policy array');
}
return new LrsLoadBalancingConfig(obj.cluster_name, obj.eds_service_name, obj.lrs_load_reporting_server_name, {
return new LrsLoadBalancingConfig(obj.cluster_name, obj.eds_service_name, validateXdsServerConfig(obj.lrs_load_reporting_server), {
region: obj.locality.region ?? '',
zone: obj.locality.zone ?? '',
sub_zone: obj.locality.sub_zone ?? ''
@ -169,8 +167,8 @@ export class LrsLoadBalancer implements LoadBalancer {
if (!(lbConfig instanceof LrsLoadBalancingConfig)) {
return;
}
this.localityStatsReporter = (attributes.xdsClient as XdsSingleServerClient).addClusterLocalityStats(
lbConfig.getLrsLoadReportingServerName(),
this.localityStatsReporter = (attributes.xdsClient as XdsClient).addClusterLocalityStats(
lbConfig.getLrsLoadReportingServer(),
lbConfig.getClusterName(),
lbConfig.getEdsServiceName(),
lbConfig.getLocality()

View File

@ -16,7 +16,8 @@
*/
import { experimental, logVerbosity, status as Status, Metadata, connectivityState } from "@grpc/grpc-js";
import { getSingletonXdsClient, XdsSingleServerClient, XdsClusterDropStats } from "./xds-client";
import { validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap";
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from "./xds-client";
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig;
@ -72,15 +73,15 @@ export class XdsClusterImplLoadBalancingConfig implements LoadBalancingConfig {
if (this.edsServiceName !== undefined) {
jsonObj.eds_service_name = this.edsServiceName;
}
if (this.lrsLoadReportingServerName !== undefined) {
jsonObj.lrs_load_reporting_server_name = this.lrsLoadReportingServerName;
if (this.lrsLoadReportingServer !== undefined) {
jsonObj.lrs_load_reporting_server_name = this.lrsLoadReportingServer;
}
return {
[TYPE_NAME]: jsonObj
};
}
constructor(private cluster: string, private dropCategories: DropCategory[], private childPolicy: LoadBalancingConfig[], private edsServiceName?: string, private lrsLoadReportingServerName?: string, maxConcurrentRequests?: number) {
constructor(private cluster: string, private dropCategories: DropCategory[], private childPolicy: LoadBalancingConfig[], private edsServiceName?: string, private lrsLoadReportingServer?: XdsServerConfig, maxConcurrentRequests?: number) {
this.maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS;
}
@ -92,8 +93,8 @@ export class XdsClusterImplLoadBalancingConfig implements LoadBalancingConfig {
return this.edsServiceName;
}
getLrsLoadReportingServerName() {
return this.lrsLoadReportingServerName;
getLrsLoadReportingServer() {
return this.lrsLoadReportingServer;
}
getMaxConcurrentRequests() {
@ -115,9 +116,6 @@ export class XdsClusterImplLoadBalancingConfig implements LoadBalancingConfig {
if ('eds_service_name' in obj && !(obj.eds_service_name === undefined || typeof obj.eds_service_name === 'string')) {
throw new Error('xds_cluster_impl config eds_service_name field must be a string if provided');
}
if ('lrs_load_reporting_server_name' in obj && (!obj.lrs_load_reporting_server_name === undefined || typeof obj.lrs_load_reporting_server_name === 'string')) {
throw new Error('xds_cluster_impl config lrs_load_reporting_server_name must be a string if provided');
}
if ('max_concurrent_requests' in obj && (!obj.max_concurrent_requests === undefined || typeof obj.max_concurrent_requests === 'number')) {
throw new Error('xds_cluster_impl config max_concurrent_requests must be a number if provided');
}
@ -127,7 +125,7 @@ export class XdsClusterImplLoadBalancingConfig implements LoadBalancingConfig {
if (!('child_policy' in obj && Array.isArray(obj.child_policy))) {
throw new Error('xds_cluster_impl config must have an array field child_policy');
}
return new XdsClusterImplLoadBalancingConfig(obj.cluster, obj.drop_categories.map(validateDropCategory), obj.child_policy.map(validateLoadBalancingConfig), obj.eds_service_name, obj.lrs_load_reporting_server_name, obj.max_concurrent_requests);
return new XdsClusterImplLoadBalancingConfig(obj.cluster, obj.drop_categories.map(validateDropCategory), obj.child_policy.map(validateLoadBalancingConfig), obj.eds_service_name, obj.lrs_load_reporting_server ? validateXdsServerConfig(obj.lrs_load_reporting_server) : undefined, obj.max_concurrent_requests);
}
}
@ -222,7 +220,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private latestConfig: XdsClusterImplLoadBalancingConfig | null = null;
private clusterDropStats: XdsClusterDropStats | null = null;
private xdsClient: XdsSingleServerClient | null = null;
private xdsClient: XdsClient | null = null;
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
@ -243,11 +241,11 @@ class XdsClusterImplBalancer implements LoadBalancer {
}
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
this.latestConfig = lbConfig;
this.xdsClient = attributes.xdsClient as XdsSingleServerClient;
this.xdsClient = attributes.xdsClient as XdsClient;
if (lbConfig.getLrsLoadReportingServerName()) {
if (lbConfig.getLrsLoadReportingServer()) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
lbConfig.getLrsLoadReportingServerName()!,
lbConfig.getLrsLoadReportingServer()!,
lbConfig.getCluster(),
lbConfig.getEdsServiceName() ?? ''
);

View File

@ -23,7 +23,7 @@ import { ClusterLoadAssignment__Output } from "./generated/envoy/config/endpoint
import { LrsLoadBalancingConfig } from "./load-balancer-lrs";
import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from "./load-balancer-priority";
import { WeightedTarget, WeightedTargetLoadBalancingConfig } from "./load-balancer-weighted-target";
import { getSingletonXdsClient, XdsSingleServerClient } from "./xds-client";
import { getSingletonXdsClient, XdsClient } from "./xds-client";
import { DropCategory, XdsClusterImplLoadBalancingConfig } from "./load-balancer-xds-cluster-impl";
import { Watcher } from "./xds-stream-state/xds-stream-state";
@ -37,6 +37,7 @@ import createResolver = experimental.createResolver;
import ChannelControlHelper = experimental.ChannelControlHelper;
import OutlierDetectionLoadBalancingConfig = experimental.OutlierDetectionLoadBalancingConfig;
import subchannelAddressToString = experimental.subchannelAddressToString;
import { serverConfigEqual, validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap";
const TRACER_NAME = 'xds_cluster_resolver';
@ -46,7 +47,7 @@ function trace(text: string): void {
export interface DiscoveryMechanism {
cluster: string;
lrs_load_reporting_server_name?: string;
lrs_load_reporting_server?: XdsServerConfig;
max_concurrent_requests?: number;
type: 'EDS' | 'LOGICAL_DNS';
eds_service_name?: string;
@ -61,9 +62,6 @@ function validateDiscoveryMechanism(obj: any): DiscoveryMechanism {
if (!('type' in obj && (obj.type === 'EDS' || obj.type === 'LOGICAL_DNS'))) {
throw new Error('discovery_mechanisms entry must have a field "type" with the value "EDS" or "LOGICAL_DNS"');
}
if ('lrs_load_reporting_server_name' in obj && typeof obj.lrs_load_reporting_server_name !== 'string') {
throw new Error('discovery_mechanisms entry lrs_load_reporting_server_name field must be a string if provided');
}
if ('max_concurrent_requests' in obj && typeof obj.max_concurrent_requests !== "number") {
throw new Error('discovery_mechanisms entry max_concurrent_requests field must be a number if provided');
}
@ -78,7 +76,7 @@ function validateDiscoveryMechanism(obj: any): DiscoveryMechanism {
if (!(outlierDetectionConfig instanceof OutlierDetectionLoadBalancingConfig)) {
throw new Error('eds config outlier_detection must be a valid outlier detection config if provided');
}
return {...obj, outlier_detection: outlierDetectionConfig};
return {...obj, lrs_load_reporting_server: validateXdsServerConfig(obj.lrs_load_reporting_server), outlier_detection: outlierDetectionConfig};
}
return obj;
}
@ -243,7 +241,7 @@ export class XdsClusterResolver implements LoadBalancer {
private discoveryMechanismList: DiscoveryMechanismEntry[] = [];
private latestConfig: XdsClusterResolverLoadBalancingConfig | null = null;
private latestAttributes: { [key: string]: unknown; } = {};
private xdsClient: XdsSingleServerClient | null = null;
private xdsClient: XdsClient | null = null;
private childBalancer: ChildLoadBalancerHandler;
constructor(private readonly channelControlHelper: ChannelControlHelper) {
@ -313,8 +311,8 @@ export class XdsClusterResolver implements LoadBalancer {
const childTargets = new Map<string, WeightedTarget>();
for (const localityObj of priorityEntry.localities) {
let childPolicy: LoadBalancingConfig[];
if (entry.discoveryMechanism.lrs_load_reporting_server_name !== undefined) {
childPolicy = [new LrsLoadBalancingConfig(entry.discoveryMechanism.cluster, entry.discoveryMechanism.eds_service_name ?? '', entry.discoveryMechanism.lrs_load_reporting_server_name!, localityObj.locality, endpointPickingPolicy)];
if (entry.discoveryMechanism.lrs_load_reporting_server !== undefined) {
childPolicy = [new LrsLoadBalancingConfig(entry.discoveryMechanism.cluster, entry.discoveryMechanism.eds_service_name ?? '', entry.discoveryMechanism.lrs_load_reporting_server, localityObj.locality, endpointPickingPolicy)];
} else {
childPolicy = endpointPickingPolicy;
}
@ -334,7 +332,7 @@ export class XdsClusterResolver implements LoadBalancer {
newLocalityPriorities.set(localityToName(localityObj.locality), priority);
}
const weightedTargetConfig = new WeightedTargetLoadBalancingConfig(childTargets);
const xdsClusterImplConfig = new XdsClusterImplLoadBalancingConfig(entry.discoveryMechanism.cluster, priorityEntry.dropCategories, [weightedTargetConfig], entry.discoveryMechanism.eds_service_name, entry.discoveryMechanism.lrs_load_reporting_server_name, entry.discoveryMechanism.max_concurrent_requests);
const xdsClusterImplConfig = new XdsClusterImplLoadBalancingConfig(entry.discoveryMechanism.cluster, priorityEntry.dropCategories, [weightedTargetConfig], entry.discoveryMechanism.eds_service_name, entry.discoveryMechanism.lrs_load_reporting_server, entry.discoveryMechanism.max_concurrent_requests);
let outlierDetectionConfig: OutlierDetectionLoadBalancingConfig | undefined;
if (EXPERIMENTAL_OUTLIER_DETECTION) {
outlierDetectionConfig = entry.discoveryMechanism.outlier_detection?.copyWithChildPolicy([xdsClusterImplConfig]);
@ -368,7 +366,7 @@ export class XdsClusterResolver implements LoadBalancer {
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
this.latestConfig = lbConfig;
this.latestAttributes = attributes;
this.xdsClient = attributes.xdsClient as XdsSingleServerClient;
this.xdsClient = attributes.xdsClient as XdsClient;
if (this.discoveryMechanismList.length === 0) {
for (const mechanism of lbConfig.getDiscoveryMechanisms()) {
const mechanismEntry: DiscoveryMechanismEntry = {
@ -448,6 +446,14 @@ export class XdsClusterResolver implements LoadBalancer {
}
}
function maybeServerConfigEqual(config1: XdsServerConfig | undefined, config2: XdsServerConfig | undefined) {
if (config1 !== undefined && config2 !== undefined) {
return serverConfigEqual(config1, config2);
} else {
return config1 === config2;
}
}
export class XdsClusterResolverChildPolicyHandler extends ChildLoadBalancerHandler {
protected configUpdateRequiresNewPolicyInstance(oldConfig: LoadBalancingConfig, newConfig: LoadBalancingConfig): boolean {
if (!(oldConfig instanceof XdsClusterResolverLoadBalancingConfig && newConfig instanceof XdsClusterResolverLoadBalancingConfig)) {
@ -463,7 +469,7 @@ export class XdsClusterResolverChildPolicyHandler extends ChildLoadBalancerHandl
oldDiscoveryMechanism.cluster !== newDiscoveryMechanism.cluster ||
oldDiscoveryMechanism.eds_service_name !== newDiscoveryMechanism.eds_service_name ||
oldDiscoveryMechanism.dns_hostname !== newDiscoveryMechanism.dns_hostname ||
oldDiscoveryMechanism.lrs_load_reporting_server_name !== newDiscoveryMechanism.lrs_load_reporting_server_name) {
!maybeServerConfigEqual(oldDiscoveryMechanism.lrs_load_reporting_server, newDiscoveryMechanism.lrs_load_reporting_server)) {
return true;
}
}

View File

@ -18,7 +18,7 @@ import * as protoLoader from '@grpc/proto-loader';
import { RE2 } from 're2-wasm';
import { getSingletonXdsClient, XdsSingleServerClient } from './xds-client';
import { getSingletonXdsClient, XdsClient } from './xds-client';
import { StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions } from '@grpc/grpc-js';
import Resolver = experimental.Resolver;
import GrpcUri = experimental.GrpcUri;
@ -273,7 +273,7 @@ class XdsResolver implements Resolver {
private bootstrapInfo: BootstrapInfo | null = null;
private xdsClient: XdsSingleServerClient;
private xdsClient: XdsClient;
constructor(
private target: GrpcUri,
@ -283,7 +283,7 @@ class XdsResolver implements Resolver {
if (channelOptions[BOOTSTRAP_CONFIG_KEY]) {
const parsedConfig = JSON.parse(channelOptions[BOOTSTRAP_CONFIG_KEY]);
this.bootstrapInfo = validateBootstrapConfig(parsedConfig);
this.xdsClient = new XdsSingleServerClient(this.bootstrapInfo);
this.xdsClient = new XdsClient(this.bootstrapInfo);
} else {
this.xdsClient = getSingletonXdsClient();
}

View File

@ -0,0 +1,75 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { XdsResourceKey, xdsResourceKeyEqual, XdsResourceName } from "./resources";
interface ResourceCacheEntry<ResourceType> {
key: XdsResourceKey;
value: ResourceType;
refCount: number;
}
export class ResourceCache<ResourceType> {
/**
* Map authority to a list of key/value pairs
*/
private cache: Map<string, ResourceCacheEntry<ResourceType>[]> = new Map();
getAndRef(name: XdsResourceName): ResourceType | undefined {
const mapEntry = this.cache.get(name.authority);
if (!mapEntry) {
return undefined;
}
for (const entry of mapEntry) {
if (xdsResourceKeyEqual(name.key, entry.key)) {
entry.refCount += 1;
return entry.value;
}
}
return undefined;
}
set(name: XdsResourceName, value: ResourceType): void {
const mapEntry = this.cache.get(name.authority);
if (!mapEntry) {
this.cache.set(name.authority, [{key: name.key, value: value, refCount: 1}]);
return;
}
for (const entry of mapEntry) {
if (xdsResourceKeyEqual(name.key, entry.key)) {
entry.value = value;
return;
}
}
mapEntry.push({key: name.key, value: value, refCount: 1});
}
unref(name: XdsResourceName): void {
const mapEntry = this.cache.get(name.authority);
if (!mapEntry) {
return;
}
for (let i = 0; i < mapEntry.length; i++) {
if (xdsResourceKeyEqual(name.key, mapEntry[i].key)) {
mapEntry[i].refCount -= 1;
if (mapEntry[i].refCount === 0) {
mapEntry.splice(i, 1);
}
return;
}
}
}
}

View File

@ -16,6 +16,7 @@
*/
// This is a non-public, unstable API, but it's very convenient
import { URI } from 'vscode-uri';
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment';
@ -23,6 +24,7 @@ import { Listener__Output } from './generated/envoy/config/listener/v3/Listener'
import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration';
import { ClusterConfig__Output } from './generated/envoy/extensions/clusters/aggregate/v3/ClusterConfig';
import { HttpConnectionManager__Output } from './generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager';
import { EXPERIMENTAL_FEDERATION } from './environment';
export const EDS_TYPE_URL = 'type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment';
export const CDS_TYPE_URL = 'type.googleapis.com/envoy.config.cluster.v3.Cluster';
@ -94,4 +96,44 @@ export function decodeSingleResource<T extends AdsTypeUrl | HttpConnectionManage
} else {
throw new Error(`ADS Error: unknown resource type ${targetTypeUrl}`);
}
}
export interface XdsResourceName {
authority: string;
key: string;
}
function stripStringPrefix(value: string, prefix: string): string {
if (value.startsWith(prefix)) {
return value.substring(prefix.length);
} else {
return value;
}
}
export function parseXdsResourceName(name: string, typeUrl: string): XdsResourceName {
if (!EXPERIMENTAL_FEDERATION || !name.startsWith('xdstp:')) {
return {
authority: 'old:',
key: name
};
}
const uri = URI.parse(name);
const pathComponents = stripStringPrefix(uri.path, '/').split('/', 2);
if (pathComponents[0] !== typeUrl) {
throw new Error('xdstp URI path must indicate valid xDS resource type');
}
const queryParams = uri.query.split('&');
queryParams.sort();
return {
authority: uri.authority,
key: `${pathComponents[1]}?${queryParams.join('&')}`
};
}
export function xdsResourceNameToString(name: XdsResourceName, typeUrl: string): string {
if (name.authority === 'old:') {
return name.key;
}
return `xdstp://${name.authority}/${typeUrl}/${name.key}`;
}

View File

@ -58,6 +58,32 @@ export interface BootstrapInfo {
clientDefaultListenerResourceNameTemplate: string;
}
const KNOWN_SERVER_FEATURES = ['ignore_resource_deletion'];
export function serverConfigEqual(config1: XdsServerConfig, config2: XdsServerConfig): boolean {
if (config1.serverUri !== config2.serverUri) {
return false;
}
for (const feature of KNOWN_SERVER_FEATURES) {
if ((feature in config1.serverFeatures) !== (feature in config2.serverFeatures)) {
return false;
}
}
if (config1.channelCreds.length !== config2.channelCreds.length) {
return false;
}
for (const [index, creds1] of config1.channelCreds.entries()) {
const creds2 = config2.channelCreds[index];
if (creds1.type !== creds2.type) {
return false;
}
if (JSON.stringify(creds1) !== JSON.stringify(creds2)) {
return false;
}
}
return true;
}
function validateChannelCredsConfig(obj: any): ChannelCredsConfig {
if (!('type' in obj)) {
throw new Error('type field missing in xds_servers.channel_creds element');
@ -80,7 +106,7 @@ function validateChannelCredsConfig(obj: any): ChannelCredsConfig {
};
}
function validateXdsServerConfig(obj: any): XdsServerConfig {
export function validateXdsServerConfig(obj: any): XdsServerConfig {
if (!('server_uri' in obj)) {
throw new Error('server_uri field missing in xds_servers element');
}

View File

@ -21,7 +21,7 @@ import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel, connectivityState } from '@grpc/grpc-js';
import * as adsTypes from './generated/ads';
import * as lrsTypes from './generated/lrs';
import { BootstrapInfo, loadBootstrapInfo, XdsServerConfig } from './xds-bootstrap';
import { BootstrapInfo, loadBootstrapInfo, serverConfigEqual, XdsServerConfig } from './xds-bootstrap';
import { Node } from './generated/envoy/config/core/v3/Node';
import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService';
import { DiscoveryRequest } from './generated/envoy/service/discovery/v3/DiscoveryRequest';
@ -37,7 +37,7 @@ import ServiceConfig = experimental.ServiceConfig;
import { createGoogleDefaultCredentials } from './google-default-credentials';
import { CdsLoadBalancingConfig } from './load-balancer-cds';
import { EdsState } from './xds-stream-state/eds-state';
import { CdsState } from './xds-stream-state/cds-state';
import { CdsState, CdsUpdate } from './xds-stream-state/cds-state';
import { RdsState } from './xds-stream-state/rds-state';
import { LdsState } from './xds-stream-state/lds-state';
import { HandleResponseResult, ResourcePair, Watcher } from './xds-stream-state/xds-stream-state';
@ -47,6 +47,7 @@ import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/Ro
import { Duration } from './generated/google/protobuf/Duration';
import { AdsOutputType, AdsTypeUrl, CDS_TYPE_URL, decodeSingleResource, EDS_TYPE_URL, LDS_TYPE_URL, RDS_TYPE_URL } from './resources';
import { setCsdsClientNode, updateCsdsRequestedNameList, updateCsdsResourceResponse } from './csds';
import { EXPERIMENTAL_FEDERATION } from './environment';
const TRACER_NAME = 'xds_client';
@ -267,16 +268,16 @@ class XdsSingleServerClient {
private lrsBackoff: BackoffTimeout;
constructor(bootstrapNode: Node, private xdsServerConfig: XdsServerConfig) {
const edsState = new EdsState(() => {
const edsState = new EdsState(xdsServerConfig, () => {
this.updateNames('eds');
});
const cdsState = new CdsState(() => {
const cdsState = new CdsState(xdsServerConfig, () => {
this.updateNames('cds');
});
const rdsState = new RdsState(() => {
const rdsState = new RdsState(xdsServerConfig, () => {
this.updateNames('rds');
});
const ldsState = new LdsState(rdsState, () => {
const ldsState = new LdsState(xdsServerConfig, rdsState, () => {
this.updateNames('lds');
});
this.adsState = {
@ -807,12 +808,12 @@ class XdsSingleServerClient {
this.adsState.eds.removeWatcher(edsServiceName, watcher);
}
addClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
addClusterWatcher(clusterName: string, watcher: Watcher<CdsUpdate>) {
this.trace('Watcher added for cluster ' + clusterName);
this.adsState.cds.addWatcher(clusterName, watcher);
}
removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
removeClusterWatcher(clusterName: string, watcher: Watcher<CdsUpdate>) {
this.trace('Watcher removed for cluster ' + clusterName);
this.adsState.cds.removeWatcher(clusterName, watcher);
}
@ -912,31 +913,24 @@ class XdsSingleServerClient {
}
}
const KNOWN_SERVER_FEATURES = ['ignore_resource_deletion'];
function serverConfigEqual(config1: XdsServerConfig, config2: XdsServerConfig): boolean {
if (config1.serverUri !== config2.serverUri) {
return false;
}
for (const feature of KNOWN_SERVER_FEATURES) {
if ((feature in config1.serverFeatures) !== (feature in config2.serverFeatures)) {
return false;
}
}
if (config1.channelCreds.length !== config2.channelCreds.length) {
return false;
}
for (const [index, creds1] of config1.channelCreds.entries()) {
const creds2 = config2.channelCreds[index];
if (creds1.type !== creds2.type) {
return false;
}
if (JSON.stringify(creds1) !== JSON.stringify(creds2)) {
return false;
}
}
return true;
}
/* Structure:
* serverConfig
* single server client
* response validation (for ACK/NACK)
* response parsing (server config in CDS update for LRS)
* authority
* EdsStreamState
* watchers
* cache
* CdsStreamState
* ...
* RdsStreamState
* ...
* LdsStreamState
* ...
* server reference
* update CSDS
*/
interface ClientMapEntry {
serverConfig: XdsServerConfig;
@ -968,16 +962,20 @@ export class XdsClient {
private getOrCreateClientForResource(resourceName: string): XdsSingleServerClient {
const bootstrapInfo = this.getBootstrapInfo();
let serverConfig: XdsServerConfig;
if (resourceName.startsWith('xdstp:')) {
const match = resourceName.match(/xdstp:\/\/([^/]+)\//);
if (!match) {
throw new Error(`Parse error: Resource ${resourceName} has no authority`);
}
const authority = match[1];
if (authority in bootstrapInfo.authorities) {
serverConfig = bootstrapInfo.authorities[authority].xdsServers?.[0] ?? bootstrapInfo.xdsServers[0];
if (EXPERIMENTAL_FEDERATION) {
if (resourceName.startsWith('xdstp:')) {
const match = resourceName.match(/xdstp:\/\/([^/]+)\//);
if (!match) {
throw new Error(`Parse error: Resource ${resourceName} has no authority`);
}
const authority = match[1];
if (authority in bootstrapInfo.authorities) {
serverConfig = bootstrapInfo.authorities[authority].xdsServers?.[0] ?? bootstrapInfo.xdsServers[0];
} else {
throw new Error(`Authority ${authority} in resource ${resourceName} not found in authorities list`);
}
} else {
throw new Error(`Authority ${authority} in resource ${resourceName} not found in authorities list`);
serverConfig = bootstrapInfo.xdsServers[0];
}
} else {
serverConfig = bootstrapInfo.xdsServers[0];
@ -1018,7 +1016,7 @@ export class XdsClient {
}
}
addClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
addClusterWatcher(clusterName: string, watcher: Watcher<CdsUpdate>) {
trace('addClusterWatcher(' + clusterName + ')');
try {
const client = this.getOrCreateClientForResource(clusterName);
@ -1029,7 +1027,7 @@ export class XdsClient {
}
}
removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
removeClusterWatcher(clusterName: string, watcher: Watcher<CdsUpdate>) {
trace('removeClusterWatcher(' + clusterName + ')');
try {
const client = this.getOrCreateClientForResource(clusterName);

View File

@ -0,0 +1,450 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { ClientDuplexStream, StatusObject, experimental, loadPackageDefinition, logVerbosity } from "@grpc/grpc-js";
import { XdsResourceType } from "./xds-resource-type/xds-resource-type";
import { XdsResourceName, parseXdsResourceName, xdsResourceNameToString } from "./resources";
import { Node } from "./generated/envoy/config/core/v3/Node";
import { BootstrapInfo, XdsServerConfig, loadBootstrapInfo, serverConfigEqual } from "./xds-bootstrap";
import BackoffTimeout = experimental.BackoffTimeout;
import { DiscoveryRequest } from "./generated/envoy/service/discovery/v3/DiscoveryRequest";
import { DiscoveryResponse__Output } from "./generated/envoy/service/discovery/v3/DiscoveryResponse";
import * as adsTypes from './generated/ads';
import * as lrsTypes from './generated/lrs';
import * as protoLoader from '@grpc/proto-loader';
import { EXPERIMENTAL_FEDERATION } from "./environment";
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
let loadedProtos: adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType | null = null;
function loadAdsProtos(): adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType {
if (loadedProtos !== null) {
return loadedProtos;
}
return (loadPackageDefinition(protoLoader
.loadSync(
[
'envoy/service/discovery/v3/ads.proto',
'envoy/service/load_stats/v3/lrs.proto',
],
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
json: true,
includeDirs: [
// Paths are relative to src/build
__dirname + '/../../deps/envoy-api/',
__dirname + '/../../deps/xds/',
__dirname + '/../../deps/googleapis/',
__dirname + '/../../deps/protoc-gen-validate/',
],
}
)) as unknown) as adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType;
}
const clientVersion = require('../../package.json').version;
export interface ResourceWatcherInterface {
onGenericResourceChanged(resource: object): void;
onError(status: StatusObject): void;
onResourceDoesNotExist(): void;
}
export interface BasicWatcher<UpdateType> {
onResourceChanged(resource: UpdateType): void;
onError(status: StatusObject): void;
onResourceDoesNotExist(): void;
}
export class Watcher<UpdateType> implements ResourceWatcherInterface {
constructor(private internalWatcher: BasicWatcher<UpdateType>) {}
onGenericResourceChanged(resource: object): void {
this.internalWatcher.onResourceChanged(resource as UpdateType);
}
onError(status: StatusObject) {
this.internalWatcher.onError(status);
}
onResourceDoesNotExist() {
this.internalWatcher.onResourceDoesNotExist();
}
}
const RESOURCE_TIMEOUT_MS = 15_000;
class ResourceTimer {
private timer: NodeJS.Timer | null = null;
private resourceSeen = false;
constructor(private callState: AdsCallState, private type: XdsResourceType, private name: XdsResourceName) {}
maybeCancelTimer() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
markSeen() {
this.resourceSeen = true;
this.maybeCancelTimer();
}
markSubscriptionSendStarted() {
this.maybeStartTimer();
}
private maybeStartTimer() {
if (this.resourceSeen) {
return;
}
if (this.timer) {
return;
}
const authorityState = this.callState.client.xdsClient.authorityStateMap.get(this.name.authority);
if (!authorityState) {
return;
}
const resourceState = authorityState.resourceMap.get(this.type)?.get(this.name.key);
if (resourceState?.cachedResource) {
return;
}
this.timer = setTimeout(() => {
this.onTimer();
}, RESOURCE_TIMEOUT_MS);
}
private onTimer() {
const authorityState = this.callState.client.xdsClient.authorityStateMap.get(this.name.authority);
const resourceState = authorityState?.resourceMap.get(this.type)?.get(this.name.key);
if (!resourceState) {
return;
}
resourceState.meta.clientStatus = 'DOES_NOT_EXIST';
for (const watcher of resourceState.watchers) {
watcher.onResourceDoesNotExist();
}
}
}
type AdsCall = ClientDuplexStream<DiscoveryRequest, DiscoveryResponse__Output>;
interface ResourceTypeState {
nonce?: string;
/**
* authority -> key -> timer
*/
subscribedResources: Map<string, Map<string, ResourceTimer>>;
}
class AdsCallState {
private typeStates: Map<XdsResourceType, ResourceTypeState> = new Map();
constructor(public client: XdsSingleServerClient, private call: AdsCall, private node: Node) {
// Populate subscription map with existing subscriptions
for (const [authority, authorityState] of client.xdsClient.authorityStateMap) {
if (authorityState.client !== client) {
continue;
}
for (const [type, typeMap] of authorityState.resourceMap) {
let typeState = this.typeStates.get(type);
if (!typeState) {
typeState = {
nonce: '',
subscribedResources: new Map()
};
}
const authorityMap: Map<string, ResourceTimer> = new Map();
for (const key of typeMap.keys()) {
const timer = new ResourceTimer(this, type, {authority, key});
authorityMap.set(key, timer);
}
typeState.subscribedResources.set(authority, authorityMap);
}
}
}
hasSubscribedResources(): boolean {
for (const typeState of this.typeStates.values()) {
for (const authorityMap of typeState.subscribedResources.values()) {
if (authorityMap.size > 0) {
return true;
}
}
}
return false;
}
subscribe(type: XdsResourceType, name: XdsResourceName) {
let typeState = this.typeStates.get(type);
if (!typeState) {
typeState = {
nonce: '',
subscribedResources: new Map()
};
}
let authorityMap = typeState.subscribedResources.get(name.authority);
if (!authorityMap) {
authorityMap = new Map();
typeState.subscribedResources.set(name.authority, authorityMap);
}
if (!authorityMap.has(name.key)) {
const timer = new ResourceTimer(this, type, name);
authorityMap.set(name.key, timer);
}
}
unsubscribe(type: XdsResourceType, name: XdsResourceName) {
this.typeStates.get(type)?.subscribedResources.get(name.authority)?.delete(name.key);
}
resourceNamesForRequest(type: XdsResourceType): string[] {
const typeState = this.typeStates.get(type);
if (!typeState) {
return [];
}
const result: string[] = [];
for (const [authority, authorityMap] of typeState.subscribedResources) {
for (const [key, timer] of authorityMap) {
timer.markSubscriptionSendStarted();
result.push(xdsResourceNameToString({authority, key}, type.getTypeUrl()));
}
}
return result;
}
updateNames(type: XdsResourceType) {
this.call.write({
node: this.node,
type_url: type.getTypeUrl(),
response_nonce: this.typeStates.get(type)?.nonce,
resource_names: this.resourceNamesForRequest(type)
});
}
}
class XdsSingleServerClient {
private adsNode: Node;
private lrsNode: Node;
private ignoreResourceDeletion: boolean;
private adsBackoff: BackoffTimeout;
private lrsBackoff: BackoffTimeout;
private adsCallState: AdsCallState | null = null;
/**
* The number of authorities that are using this client. Streams should only
* be started if refcount > 0
*/
private refcount = 0;
/**
* Map of type to latest accepted version string for that type
*/
public resourceTypeVersionMap: Map<XdsResourceType, string> = new Map();
constructor(public xdsClient: XdsClient, bootstrapNode: Node, private xdsServerConfig: XdsServerConfig) {
this.adsBackoff = new BackoffTimeout(() => {
this.maybeStartAdsStream();
});
this.adsBackoff.unref();
this.lrsBackoff = new BackoffTimeout(() => {
this.maybeStartLrsStream();
});
this.lrsBackoff.unref();
this.ignoreResourceDeletion = xdsServerConfig.serverFeatures.includes('ignore_resource_deletion');
const userAgentName = 'gRPC Node Pure JS';
this.adsNode = {
...bootstrapNode,
user_agent_name: userAgentName,
user_agent_version: clientVersion,
client_features: ['envoy.lb.does_not_support_overprovisioning'],
};
this.lrsNode = {
...bootstrapNode,
user_agent_name: userAgentName,
user_agent_version: clientVersion,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
}
private trace(text: string) {
trace(this.xdsServerConfig.serverUri + ' ' + text);
}
subscribe(type: XdsResourceType, name: XdsResourceName) {
this.adsCallState?.subscribe(type, name);
}
unsubscribe(type: XdsResourceType, name: XdsResourceName) {
this.adsCallState?.unsubscribe(type, name);
}
ref() {
this.refcount += 1;
}
unref() {
this.refcount -= 1;
}
}
interface ClientMapEntry {
serverConfig: XdsServerConfig;
client: XdsSingleServerClient;
}
type ClientResourceStatus = 'REQUESTED' | 'DOES_NOT_EXIST' | 'ACKED' | 'NACKED';
interface ResourceMetadata {
clientStatus: ClientResourceStatus;
updateTime: Date | null;
version: string | null;
failedVersion: string | null;
failedDetails: string | null;
failedUpdateTime: string | null;
}
interface ResourceState {
watchers: Set<ResourceWatcherInterface>;
cachedResource: object | null;
meta: ResourceMetadata;
deletionIgnored: boolean;
}
interface AuthorityState {
client: XdsSingleServerClient;
/**
* type -> key -> state
*/
resourceMap: Map<XdsResourceType, Map<string, ResourceState>>;
}
export class XdsClient {
/**
* authority -> authority state
*/
public authorityStateMap: Map<string, AuthorityState> = new Map();
private clients: ClientMapEntry[] = [];
constructor(private bootstrapInfoOverride?: BootstrapInfo) {}
private getBootstrapInfo() {
if (this.bootstrapInfoOverride) {
return this.bootstrapInfoOverride;
} else {
return loadBootstrapInfo();
}
}
private getOrCreateClient(authority: string): XdsSingleServerClient {
const bootstrapInfo = this.getBootstrapInfo();
let serverConfig: XdsServerConfig;
if (authority === ':old') {
serverConfig = bootstrapInfo.xdsServers[0];
} else {
if (authority in bootstrapInfo.authorities) {
serverConfig = bootstrapInfo.authorities[authority].xdsServers?.[0] ?? bootstrapInfo.xdsServers[0];
} else {
throw new Error(`Authority ${authority} not found in bootstrap authorities list`);
}
}
for (const entry of this.clients) {
if (serverConfigEqual(serverConfig, entry.serverConfig)) {
return entry.client;
}
}
const client = new XdsSingleServerClient(this, bootstrapInfo.node, serverConfig);
this.clients.push({client, serverConfig});
return client;
}
watchResource(type: XdsResourceType, name: string, watcher: ResourceWatcherInterface) {
const resourceName = parseXdsResourceName(name, type.getTypeUrl());
let authorityState = this.authorityStateMap.get(resourceName.authority);
if (!authorityState) {
authorityState = {
client: this.getOrCreateClient(resourceName.authority),
resourceMap: new Map()
};
authorityState.client.ref();
}
let keyMap = authorityState.resourceMap.get(type);
if (!keyMap) {
keyMap = new Map();
authorityState.resourceMap.set(type, keyMap);
}
let entry = keyMap.get(resourceName.key);
let isNewSubscription = false;
if (!entry) {
isNewSubscription = true;
entry = {
watchers: new Set(),
cachedResource: null,
deletionIgnored: false,
meta: {
clientStatus: 'REQUESTED',
updateTime: null,
version: null,
failedVersion: null,
failedUpdateTime: null,
failedDetails: null
}
};
keyMap.set(resourceName.key, entry);
}
entry.watchers.add(watcher);
if (entry.cachedResource) {
process.nextTick(() => {
if (entry?.cachedResource) {
watcher.onGenericResourceChanged(entry.cachedResource);
}
});
}
if (isNewSubscription) {
authorityState.client.subscribe(type, resourceName);
}
}
cancelResourceWatch(type: XdsResourceType, name: string, watcher: ResourceWatcherInterface) {
const resourceName = parseXdsResourceName(name, type.getTypeUrl());
const authorityState = this.authorityStateMap.get(resourceName.authority);
if (!authorityState) {
return;
}
const entry = authorityState.resourceMap.get(type)?.get(resourceName.key);
if (entry) {
entry.watchers.delete(watcher);
if (entry.watchers.size === 0) {
authorityState.resourceMap.get(type)!.delete(resourceName.key);
authorityState.client.unsubscribe(type, resourceName);
if (authorityState.resourceMap.get(type)!.size === 0) {
authorityState.resourceMap.delete(type);
if (authorityState.resourceMap.size === 0) {
authorityState.client.unref();
this.authorityStateMap.delete(resourceName.authority);
}
}
}
}
}
}

View File

@ -0,0 +1,239 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { CDS_TYPE_URL, CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from "../resources";
import { XdsResourceType } from "./xds-resource-type";
import { experimental } from "@grpc/grpc-js";
import { XdsServerConfig } from "../xds-bootstrap";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { OutlierDetection__Output } from "../generated/envoy/config/cluster/v3/OutlierDetection";
import { EXPERIMENTAL_OUTLIER_DETECTION } from "../environment";
import SuccessRateEjectionConfig = experimental.SuccessRateEjectionConfig;
import FailurePercentageEjectionConfig = experimental.FailurePercentageEjectionConfig;
import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster";
import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value";
export interface OutlierDetectionUpdate {
intervalMs: number | null;
baseEjectionTimeMs: number | null;
maxEjectionTimeMs: number | null;
maxEjectionPercent: number | null;
successRateConfig: Partial<SuccessRateEjectionConfig> | null;
failurePercentageConfig: Partial<FailurePercentageEjectionConfig> | null;
}
export interface CdsUpdate {
type: 'AGGREGATE' | 'EDS' | 'LOGICAL_DNS';
name: string;
aggregateChildren: string[];
lrsLoadReportingServer?: XdsServerConfig;
maxConcurrentRequests?: number;
edsServiceName?: string;
dnsHostname?: string;
outlierDetectionUpdate?: OutlierDetectionUpdate;
}
function durationToMs(duration: Duration__Output): number {
return (Number(duration.seconds) * 1_000 + duration.nanos / 1_000_000) | 0;
}
function convertOutlierDetectionUpdate(outlierDetection: OutlierDetection__Output | null): OutlierDetectionUpdate | undefined {
if (!EXPERIMENTAL_OUTLIER_DETECTION) {
return undefined;
}
if (!outlierDetection) {
/* No-op outlier detection config, with all fields unset. */
return {
intervalMs: null,
baseEjectionTimeMs: null,
maxEjectionTimeMs: null,
maxEjectionPercent: null,
successRateConfig: null,
failurePercentageConfig: null
};
}
let successRateConfig: Partial<SuccessRateEjectionConfig> | null = null;
/* Success rate ejection is enabled by default, so we only disable it if
* enforcing_success_rate is set and it has the value 0 */
if (!outlierDetection.enforcing_success_rate || outlierDetection.enforcing_success_rate.value > 0) {
successRateConfig = {
enforcement_percentage: outlierDetection.enforcing_success_rate?.value,
minimum_hosts: outlierDetection.success_rate_minimum_hosts?.value,
request_volume: outlierDetection.success_rate_request_volume?.value,
stdev_factor: outlierDetection.success_rate_stdev_factor?.value
};
}
let failurePercentageConfig: Partial<FailurePercentageEjectionConfig> | null = null;
/* Failure percentage ejection is disabled by default, so we only enable it
* if enforcing_failure_percentage is set and it has a value greater than 0 */
if (outlierDetection.enforcing_failure_percentage && outlierDetection.enforcing_failure_percentage.value > 0) {
failurePercentageConfig = {
enforcement_percentage: outlierDetection.enforcing_failure_percentage.value,
minimum_hosts: outlierDetection.failure_percentage_minimum_hosts?.value,
request_volume: outlierDetection.failure_percentage_request_volume?.value,
threshold: outlierDetection.failure_percentage_threshold?.value
}
}
return {
intervalMs: outlierDetection.interval ? durationToMs(outlierDetection.interval) : null,
baseEjectionTimeMs: outlierDetection.base_ejection_time ? durationToMs(outlierDetection.base_ejection_time) : null,
maxEjectionTimeMs: outlierDetection.max_ejection_time ? durationToMs(outlierDetection.max_ejection_time) : null,
maxEjectionPercent : outlierDetection.max_ejection_percent?.value ?? null,
successRateConfig: successRateConfig,
failurePercentageConfig: failurePercentageConfig
};
}
export class ClusterResourceType extends XdsResourceType {
private static singleton: ClusterResourceType = new ClusterResourceType();
private constructor() {
super();
}
static get() {
return ClusterResourceType.singleton;
}
getTypeUrl(): string {
return CDS_TYPE_URL;
}
private validateNonnegativeDuration(duration: Duration__Output | null): boolean {
if (!duration) {
return true;
}
/* The maximum values here come from the official Protobuf documentation:
* https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#google.protobuf.Duration
*/
return Number(duration.seconds) >= 0 &&
Number(duration.seconds) <= 315_576_000_000 &&
duration.nanos >= 0 &&
duration.nanos <= 999_999_999;
}
private validatePercentage(percentage: UInt32Value__Output | null): boolean {
if (!percentage) {
return true;
}
return percentage.value >=0 && percentage.value <= 100;
}
private validateResource(message: Cluster__Output): CdsUpdate | null {
if (message.lb_policy !== 'ROUND_ROBIN') {
return null;
}
if (message.lrs_server) {
if (!message.lrs_server.self) {
return null;
}
}
if (EXPERIMENTAL_OUTLIER_DETECTION) {
if (message.outlier_detection) {
if (!this.validateNonnegativeDuration(message.outlier_detection.interval)) {
return null;
}
if (!this.validateNonnegativeDuration(message.outlier_detection.base_ejection_time)) {
return null;
}
if (!this.validateNonnegativeDuration(message.outlier_detection.max_ejection_time)) {
return null;
}
if (!this.validatePercentage(message.outlier_detection.max_ejection_percent)) {
return null;
}
if (!this.validatePercentage(message.outlier_detection.enforcing_success_rate)) {
return null;
}
if (!this.validatePercentage(message.outlier_detection.failure_percentage_threshold)) {
return null;
}
if (!this.validatePercentage(message.outlier_detection.enforcing_failure_percentage)) {
return null;
}
}
}
if (message.cluster_discovery_type === 'cluster_type') {
if (!(message.cluster_type?.typed_config && message.cluster_type.typed_config.type_url === CLUSTER_CONFIG_TYPE_URL)) {
return null;
}
const clusterConfig = decodeSingleResource(CLUSTER_CONFIG_TYPE_URL, message.cluster_type.typed_config.value);
if (clusterConfig.clusters.length === 0) {
return null;
}
return {
type: 'AGGREGATE',
name: message.name,
aggregateChildren: clusterConfig.clusters,
outlierDetectionUpdate: convertOutlierDetectionUpdate(null)
};
} else {
let maxConcurrentRequests: number | undefined = undefined;
for (const threshold of message.circuit_breakers?.thresholds ?? []) {
if (threshold.priority === 'DEFAULT') {
maxConcurrentRequests = threshold.max_requests?.value;
}
}
if (message.type === 'EDS') {
if (!message.eds_cluster_config?.eds_config?.ads) {
return null;
}
return {
type: 'EDS',
name: message.name,
aggregateChildren: [],
maxConcurrentRequests: maxConcurrentRequests,
edsServiceName: message.eds_cluster_config.service_name === '' ? undefined : message.eds_cluster_config.service_name,
lrsLoadReportingServer: message.lrs_server ? this.xdsServer : undefined,
outlierDetectionUpdate: convertOutlierDetectionUpdate(message.outlier_detection)
}
} else if (message.type === 'LOGICAL_DNS') {
if (!message.load_assignment) {
return null;
}
if (message.load_assignment.endpoints.length !== 1) {
return null;
}
if (message.load_assignment.endpoints[0].lb_endpoints.length !== 1) {
return null;
}
const socketAddress = message.load_assignment.endpoints[0].lb_endpoints[0].endpoint?.address?.socket_address;
if (!socketAddress) {
return null;
}
if (socketAddress.address === '') {
return null;
}
if (socketAddress.port_specifier !== 'port_value') {
return null;
}
return {
type: 'LOGICAL_DNS',
name: message.name,
aggregateChildren: [],
maxConcurrentRequests: maxConcurrentRequests,
dnsHostname: `${socketAddress.address}:${socketAddress.port_value}`,
lrsLoadReportingServer: message.lrs_server ? this.xdsServer : undefined,
outlierDetectionUpdate: convertOutlierDetectionUpdate(message.outlier_detection)
};
}
}
return null;
}
}

View File

@ -0,0 +1,129 @@
import { experimental, logVerbosity } from "@grpc/grpc-js";
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
import { XdsDecodeResult, XdsResourceType } from "./xds-resource-type";
import { Locality__Output } from "../generated/envoy/config/core/v3/Locality";
import { SocketAddress__Output } from "../generated/envoy/config/core/v3/SocketAddress";
import { isIPv4, isIPv6 } from "net";
import { Any__Output } from "../generated/google/protobuf/Any";
import { EDS_TYPE_URL, decodeSingleResource } from "../resources";
import { Watcher, XdsClient } from "../xds-client2";
const TRACER_NAME = 'xds_client';
const UINT32_MAX = 0xFFFFFFFF;
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
function localitiesEqual(a: Locality__Output, b: Locality__Output) {
return a.region === b.region && a.sub_zone === b.sub_zone && a.zone === b.zone;
}
function addressesEqual(a: SocketAddress__Output, b: SocketAddress__Output) {
return a.address === b.address && a.port_value === b.port_value;
}
export class EndpointResourceType extends XdsResourceType {
private static singleton: EndpointResourceType = new EndpointResourceType();
private constructor() {
super();
}
static get() {
return EndpointResourceType.singleton;
}
getTypeUrl(): string {
return EDS_TYPE_URL;
}
private validateResource(message: ClusterLoadAssignment__Output): ClusterLoadAssignment__Output | null {
const seenLocalities: {locality: Locality__Output, priority: number}[] = [];
const seenAddresses: SocketAddress__Output[] = [];
const priorityTotalWeights: Map<number, number> = new Map();
for (const endpoint of message.endpoints) {
if (!endpoint.locality) {
trace('EDS validation: endpoint locality unset');
return null;
}
for (const {locality, priority} of seenLocalities) {
if (localitiesEqual(endpoint.locality, locality) && endpoint.priority === priority) {
trace('EDS validation: endpoint locality duplicated: ' + JSON.stringify(locality) + ', priority=' + priority);
return null;
}
}
seenLocalities.push({locality: endpoint.locality, priority: endpoint.priority});
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
trace('EDS validation: endpoint socket_address not set');
return null;
}
if (socketAddress.port_specifier !== 'port_value') {
trace('EDS validation: socket_address.port_specifier !== "port_value"');
return null;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
trace('EDS validation: address not a valid IPv4 or IPv6 address: ' + socketAddress.address);
return null;
}
for (const address of seenAddresses) {
if (addressesEqual(socketAddress, address)) {
trace('EDS validation: duplicate address seen: ' + address);
return null;
}
}
seenAddresses.push(socketAddress);
}
priorityTotalWeights.set(endpoint.priority, (priorityTotalWeights.get(endpoint.priority) ?? 0) + (endpoint.load_balancing_weight?.value ?? 0));
}
for (const totalWeight of priorityTotalWeights.values()) {
if (totalWeight > UINT32_MAX) {
trace('EDS validation: total weight > UINT32_MAX')
return null;
}
}
for (const priority of priorityTotalWeights.keys()) {
if (priority > 0 && !priorityTotalWeights.has(priority - 1)) {
trace('EDS validation: priorities not contiguous');
return null;
}
}
return message;
}
decode(resource: Any__Output): XdsDecodeResult {
if (resource.type_url !== EDS_TYPE_URL) {
throw new Error(
`ADS Error: Invalid resource type ${resource.type_url}, expected ${EDS_TYPE_URL}`
);
}
const message = decodeSingleResource(EDS_TYPE_URL, resource.value);
const validatedMessage = this.validateResource(message);
if (validatedMessage) {
return {
name: validatedMessage.cluster_name,
value: validatedMessage
};
} else {
return {
name: message.cluster_name,
error: 'Listener message validation failed'
};
}
}
allResourcesRequiredInSotW(): boolean {
return true;
}
static startWatch(client: XdsClient, name: string, watcher: Watcher<ClusterLoadAssignment__Output>) {
client.watchResource(EndpointResourceType.get(), name, watcher);
}
static cancelWatch(client: XdsClient, name: string, watcher: Watcher<ClusterLoadAssignment__Output>) {
client.cancelResourceWatch(EndpointResourceType.get(), name, watcher);
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { logVerbosity, experimental } from "@grpc/grpc-js";
import { EXPERIMENTAL_FAULT_INJECTION } from "../environment";
import { Listener__Output } from "../generated/envoy/config/listener/v3/Listener";
import { Any__Output } from "../generated/google/protobuf/Any";
import { HTTP_CONNECTION_MANGER_TYPE_URL, LDS_TYPE_URL, decodeSingleResource } from "../resources";
import { XdsDecodeResult, XdsResourceType } from "./xds-resource-type";
import { getTopLevelFilterUrl, validateTopLevelFilter } from "../http-filter";
import { RouteConfigurationResourceType } from "./route-config-resource-type";
import { Watcher, XdsClient } from "../xds-client2";
const TRACER_NAME = 'xds_client';
function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}
const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router';
export class ListenerResourceType extends XdsResourceType {
private static singleton: ListenerResourceType = new ListenerResourceType();
private constructor() {
super();
}
static get() {
return ListenerResourceType.singleton;
}
getTypeUrl(): string {
return LDS_TYPE_URL;
}
private validateResource(message: Listener__Output): Listener__Output | null {
if (
!(
message.api_listener?.api_listener &&
message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL
)
) {
return null;
}
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, message.api_listener!.api_listener.value);
if (EXPERIMENTAL_FAULT_INJECTION) {
const filterNames = new Set<string>();
for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) {
if (filterNames.has(httpFilter.name)) {
trace('LDS response validation failed: duplicate HTTP filter name ' + httpFilter.name);
return null;
}
filterNames.add(httpFilter.name);
if (!validateTopLevelFilter(httpFilter)) {
trace('LDS response validation failed: ' + httpFilter.name + ' filter validation failed');
return null;
}
/* Validate that the last filter, and only the last filter, is the
* router filter. */
const filterUrl = getTopLevelFilterUrl(httpFilter.typed_config!)
if (index < httpConnectionManager.http_filters.length - 1) {
if (filterUrl === ROUTER_FILTER_URL) {
trace('LDS response validation failed: router filter is before end of list');
return null;
}
} else {
if (filterUrl !== ROUTER_FILTER_URL) {
trace('LDS response validation failed: final filter is ' + filterUrl);
return null;
}
}
}
}
switch (httpConnectionManager.route_specifier) {
case 'rds':
if (!httpConnectionManager.rds?.config_source?.ads) {
return null;
}
return message;
case 'route_config':
if (!RouteConfigurationResourceType.get().validateResource(httpConnectionManager.route_config!)) {
return null;
}
return message;
}
return null;
}
decode(resource: Any__Output): XdsDecodeResult {
if (resource.type_url !== LDS_TYPE_URL) {
throw new Error(
`ADS Error: Invalid resource type ${resource.type_url}, expected ${LDS_TYPE_URL}`
);
}
const message = decodeSingleResource(LDS_TYPE_URL, resource.value);
const validatedMessage = this.validateResource(message);
if (validatedMessage) {
return {
name: validatedMessage.name,
value: validatedMessage
};
} else {
return {
name: message.name,
error: 'Listener message validation failed'
};
}
}
allResourcesRequiredInSotW(): boolean {
return true;
}
static startWatch(client: XdsClient, name: string, watcher: Watcher<Listener__Output>) {
client.watchResource(ListenerResourceType.get(), name, watcher);
}
static cancelWatch(client: XdsClient, name: string, watcher: Watcher<Listener__Output>) {
client.cancelResourceWatch(ListenerResourceType.get(), name, watcher);
}
}

View File

@ -0,0 +1,196 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from "../environment";
import { RetryPolicy__Output } from "../generated/envoy/config/route/v3/RetryPolicy";
import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration";
import { Any__Output } from "../generated/google/protobuf/Any";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { validateOverrideFilter } from "../http-filter";
import { RDS_TYPE_URL, decodeSingleResource } from "../resources";
import { Watcher, XdsClient } from "../xds-client2";
import { XdsDecodeResult, XdsResourceType } from "./xds-resource-type";
const SUPPORTED_PATH_SPECIFIERS = ['prefix', 'path', 'safe_regex'];
const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
'exact_match',
'safe_regex_match',
'range_match',
'present_match',
'prefix_match',
'suffix_match'];
const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header'];
const UINT32_MAX = 0xFFFFFFFF;
function durationToMs(duration: Duration__Output | null): number | null {
if (duration === null) {
return null;
}
return (Number.parseInt(duration.seconds) * 1000 + duration.nanos / 1_000_000) | 0;
}
export class RouteConfigurationResourceType extends XdsResourceType {
private static singleton: RouteConfigurationResourceType = new RouteConfigurationResourceType();
private constructor() {
super();
}
static get() {
return RouteConfigurationResourceType.singleton;
}
getTypeUrl(): string {
return RDS_TYPE_URL;
}
private validateRetryPolicy(policy: RetryPolicy__Output | null): boolean {
if (policy === null) {
return true;
}
const numRetries = policy.num_retries?.value ?? 1
if (numRetries < 1) {
return false;
}
if (policy.retry_back_off) {
if (!policy.retry_back_off.base_interval) {
return false;
}
const baseInterval = durationToMs(policy.retry_back_off.base_interval)!;
const maxInterval = durationToMs(policy.retry_back_off.max_interval) ?? (10 * baseInterval);
if (!(maxInterval >= baseInterval) && (baseInterval > 0)) {
return false;
}
}
return true;
}
public validateResource(message: RouteConfiguration__Output): RouteConfiguration__Output | null {
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
for (const virtualHost of message.virtual_hosts) {
for (const domainPattern of virtualHost.domains) {
const starIndex = domainPattern.indexOf('*');
const lastStarIndex = domainPattern.lastIndexOf('*');
// A domain pattern can have at most one wildcard *
if (starIndex !== lastStarIndex) {
return null;
}
// A wildcard * can either be absent or at the beginning or end of the pattern
if (!(starIndex === -1 || starIndex === 0 || starIndex === domainPattern.length - 1)) {
return null;
}
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const filterConfig of Object.values(virtualHost.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return null;
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(virtualHost.retry_policy)) {
return null;
}
}
for (const route of virtualHost.routes) {
const match = route.match;
if (!match) {
return null;
}
if (SUPPORTED_PATH_SPECIFIERS.indexOf(match.path_specifier) < 0) {
return null;
}
for (const headers of match.headers) {
if (SUPPPORTED_HEADER_MATCH_SPECIFIERS.indexOf(headers.header_match_specifier) < 0) {
return null;
}
}
if (route.action !== 'route') {
return null;
}
if ((route.route === undefined) || (route.route === null) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) {
return null;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return null;
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(route.route.retry_policy)) {
return null;
}
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
let weightSum = 0;
for (const clusterWeight of route.route.weighted_clusters!.clusters) {
weightSum += clusterWeight.weight?.value ?? 0;
}
if (weightSum === 0 || weightSum > UINT32_MAX) {
return null;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const weightedCluster of route.route!.weighted_clusters!.clusters) {
for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return null;
}
}
}
}
}
}
}
return message;
}
decode(resource: Any__Output): XdsDecodeResult {
if (resource.type_url !== RDS_TYPE_URL) {
throw new Error(
`ADS Error: Invalid resource type ${resource.type_url}, expected ${RDS_TYPE_URL}`
);
}
const message = decodeSingleResource(RDS_TYPE_URL, resource.value);
const validatedMessage = this.validateResource(message);
if (validatedMessage) {
return {
name: validatedMessage.name,
value: validatedMessage
};
} else {
return {
name: message.name,
error: 'Listener message validation failed'
};
}
}
allResourcesRequiredInSotW(): boolean {
return false;
}
static startWatch(client: XdsClient, name: string, watcher: Watcher<RouteConfiguration__Output>) {
client.watchResource(RouteConfigurationResourceType.get(), name, watcher);
}
static cancelWatch(client: XdsClient, name: string, watcher: Watcher<RouteConfiguration__Output>) {
client.cancelResourceWatch(RouteConfigurationResourceType.get(), name, watcher);
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { Any__Output } from "../generated/google/protobuf/Any";
export interface XdsDecodeResult {
name: string;
value?: object;
error?: string;
}
export abstract class XdsResourceType {
abstract getTypeUrl(): string;
abstract decode(resource: Any__Output): XdsDecodeResult;
abstract allResourcesRequiredInSotW(): boolean;
}

View File

@ -15,14 +15,88 @@
*
*/
import { FailurePercentageEjectionConfig, SuccessRateEjectionConfig } from "@grpc/grpc-js/build/src/load-balancer-outlier-detection";
import { EXPERIMENTAL_OUTLIER_DETECTION } from "../environment";
import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster";
import { OutlierDetection__Output } from "../generated/envoy/config/cluster/v3/OutlierDetection";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value";
import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from "../resources";
import { XdsServerConfig } from "../xds-bootstrap";
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";
export class CdsState extends BaseXdsStreamState<Cluster__Output> implements XdsStreamState<Cluster__Output> {
export interface OutlierDetectionUpdate {
intervalMs: number | null;
baseEjectionTimeMs: number | null;
maxEjectionTimeMs: number | null;
maxEjectionPercent: number | null;
successRateConfig: Partial<SuccessRateEjectionConfig> | null;
failurePercentageConfig: Partial<FailurePercentageEjectionConfig> | null;
}
export interface CdsUpdate {
type: 'AGGREGATE' | 'EDS' | 'LOGICAL_DNS';
name: string;
aggregateChildren: string[];
lrsLoadReportingServer?: XdsServerConfig;
maxConcurrentRequests?: number;
edsServiceName?: string;
dnsHostname?: string;
outlierDetectionUpdate?: OutlierDetectionUpdate;
}
function durationToMs(duration: Duration__Output): number {
return (Number(duration.seconds) * 1_000 + duration.nanos / 1_000_000) | 0;
}
function convertOutlierDetectionUpdate(outlierDetection: OutlierDetection__Output | null): OutlierDetectionUpdate | undefined {
if (!EXPERIMENTAL_OUTLIER_DETECTION) {
return undefined;
}
if (!outlierDetection) {
/* No-op outlier detection config, with all fields unset. */
return {
intervalMs: null,
baseEjectionTimeMs: null,
maxEjectionTimeMs: null,
maxEjectionPercent: null,
successRateConfig: null,
failurePercentageConfig: null
};
}
let successRateConfig: Partial<SuccessRateEjectionConfig> | null = null;
/* Success rate ejection is enabled by default, so we only disable it if
* enforcing_success_rate is set and it has the value 0 */
if (!outlierDetection.enforcing_success_rate || outlierDetection.enforcing_success_rate.value > 0) {
successRateConfig = {
enforcement_percentage: outlierDetection.enforcing_success_rate?.value,
minimum_hosts: outlierDetection.success_rate_minimum_hosts?.value,
request_volume: outlierDetection.success_rate_request_volume?.value,
stdev_factor: outlierDetection.success_rate_stdev_factor?.value
};
}
let failurePercentageConfig: Partial<FailurePercentageEjectionConfig> | null = null;
/* Failure percentage ejection is disabled by default, so we only enable it
* if enforcing_failure_percentage is set and it has a value greater than 0 */
if (outlierDetection.enforcing_failure_percentage && outlierDetection.enforcing_failure_percentage.value > 0) {
failurePercentageConfig = {
enforcement_percentage: outlierDetection.enforcing_failure_percentage.value,
minimum_hosts: outlierDetection.failure_percentage_minimum_hosts?.value,
request_volume: outlierDetection.failure_percentage_request_volume?.value,
threshold: outlierDetection.failure_percentage_threshold?.value
}
}
return {
intervalMs: outlierDetection.interval ? durationToMs(outlierDetection.interval) : null,
baseEjectionTimeMs: outlierDetection.base_ejection_time ? durationToMs(outlierDetection.base_ejection_time) : null,
maxEjectionTimeMs: outlierDetection.max_ejection_time ? durationToMs(outlierDetection.max_ejection_time) : null,
maxEjectionPercent : outlierDetection.max_ejection_percent?.value ?? null,
successRateConfig: successRateConfig,
failurePercentageConfig: failurePercentageConfig
};
}
export class CdsState extends BaseXdsStreamState<Cluster__Output, CdsUpdate> implements XdsStreamState<Cluster__Output, CdsUpdate> {
protected isStateOfTheWorld(): boolean {
return true;
}
@ -53,75 +127,105 @@ export class CdsState extends BaseXdsStreamState<Cluster__Output> implements Xds
return percentage.value >=0 && percentage.value <= 100;
}
public validateResponse(message: Cluster__Output): boolean {
if (message.cluster_discovery_type === 'cluster_type') {
if (!(message.cluster_type?.typed_config && message.cluster_type.typed_config.type_url === CLUSTER_CONFIG_TYPE_URL)) {
return false;
}
const clusterConfig = decodeSingleResource(CLUSTER_CONFIG_TYPE_URL, message.cluster_type.typed_config.value);
if (clusterConfig.clusters.length === 0) {
return false;
}
} else {
if (message.type === 'EDS') {
if (!message.eds_cluster_config?.eds_config?.ads) {
return false;
}
} else if (message.type === 'LOGICAL_DNS') {
if (!message.load_assignment) {
return false;
}
if (message.load_assignment.endpoints.length !== 1) {
return false;
}
if (message.load_assignment.endpoints[0].lb_endpoints.length !== 1) {
return false;
}
const socketAddress = message.load_assignment.endpoints[0].lb_endpoints[0].endpoint?.address?.socket_address;
if (!socketAddress) {
return false;
}
if (socketAddress.address === '') {
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
}
}
public validateResponse(message: Cluster__Output): CdsUpdate | null {
if (message.lb_policy !== 'ROUND_ROBIN') {
return false;
return null;
}
if (message.lrs_server) {
if (!message.lrs_server.self) {
return false;
return null;
}
}
if (EXPERIMENTAL_OUTLIER_DETECTION) {
if (message.outlier_detection) {
if (!this.validateNonnegativeDuration(message.outlier_detection.interval)) {
return false;
return null;
}
if (!this.validateNonnegativeDuration(message.outlier_detection.base_ejection_time)) {
return false;
return null;
}
if (!this.validateNonnegativeDuration(message.outlier_detection.max_ejection_time)) {
return false;
return null;
}
if (!this.validatePercentage(message.outlier_detection.max_ejection_percent)) {
return false;
return null;
}
if (!this.validatePercentage(message.outlier_detection.enforcing_success_rate)) {
return false;
return null;
}
if (!this.validatePercentage(message.outlier_detection.failure_percentage_threshold)) {
return false;
return null;
}
if (!this.validatePercentage(message.outlier_detection.enforcing_failure_percentage)) {
return false;
return null;
}
}
}
return true;
if (message.cluster_discovery_type === 'cluster_type') {
if (!(message.cluster_type?.typed_config && message.cluster_type.typed_config.type_url === CLUSTER_CONFIG_TYPE_URL)) {
return null;
}
const clusterConfig = decodeSingleResource(CLUSTER_CONFIG_TYPE_URL, message.cluster_type.typed_config.value);
if (clusterConfig.clusters.length === 0) {
return null;
}
return {
type: 'AGGREGATE',
name: message.name,
aggregateChildren: clusterConfig.clusters,
outlierDetectionUpdate: convertOutlierDetectionUpdate(null)
};
} else {
let maxConcurrentRequests: number | undefined = undefined;
for (const threshold of message.circuit_breakers?.thresholds ?? []) {
if (threshold.priority === 'DEFAULT') {
maxConcurrentRequests = threshold.max_requests?.value;
}
}
if (message.type === 'EDS') {
if (!message.eds_cluster_config?.eds_config?.ads) {
return null;
}
return {
type: 'EDS',
name: message.name,
aggregateChildren: [],
maxConcurrentRequests: maxConcurrentRequests,
edsServiceName: message.eds_cluster_config.service_name === '' ? undefined : message.eds_cluster_config.service_name,
lrsLoadReportingServer: message.lrs_server ? this.xdsServer : undefined,
outlierDetectionUpdate: convertOutlierDetectionUpdate(message.outlier_detection)
}
} else if (message.type === 'LOGICAL_DNS') {
if (!message.load_assignment) {
return null;
}
if (message.load_assignment.endpoints.length !== 1) {
return null;
}
if (message.load_assignment.endpoints[0].lb_endpoints.length !== 1) {
return null;
}
const socketAddress = message.load_assignment.endpoints[0].lb_endpoints[0].endpoint?.address?.socket_address;
if (!socketAddress) {
return null;
}
if (socketAddress.address === '') {
return null;
}
if (socketAddress.port_specifier !== 'port_value') {
return null;
}
return {
type: 'LOGICAL_DNS',
name: message.name,
aggregateChildren: [],
maxConcurrentRequests: maxConcurrentRequests,
dnsHostname: `${socketAddress.address}:${socketAddress.port_value}`,
lrsLoadReportingServer: message.lrs_server ? this.xdsServer : undefined,
outlierDetectionUpdate: convertOutlierDetectionUpdate(message.outlier_detection)
};
}
}
return null;
}
}

View File

@ -39,7 +39,7 @@ function addressesEqual(a: SocketAddress__Output, b: SocketAddress__Output) {
return a.address === b.address && a.port_value === b.port_value;
}
export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output> implements XdsStreamState<ClusterLoadAssignment__Output> {
export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output, ClusterLoadAssignment__Output> implements XdsStreamState<ClusterLoadAssignment__Output, ClusterLoadAssignment__Output> {
protected getResourceName(resource: ClusterLoadAssignment__Output): string {
return resource.cluster_name;
}
@ -62,12 +62,12 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
for (const endpoint of message.endpoints) {
if (!endpoint.locality) {
trace('EDS validation: endpoint locality unset');
return false;
return null;
}
for (const {locality, priority} of seenLocalities) {
if (localitiesEqual(endpoint.locality, locality) && endpoint.priority === priority) {
trace('EDS validation: endpoint locality duplicated: ' + JSON.stringify(locality) + ', priority=' + priority);
return false;
return null;
}
}
seenLocalities.push({locality: endpoint.locality, priority: endpoint.priority});
@ -75,20 +75,20 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
trace('EDS validation: endpoint socket_address not set');
return false;
return null;
}
if (socketAddress.port_specifier !== 'port_value') {
trace('EDS validation: socket_address.port_specifier !== "port_value"');
return false;
return null;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
trace('EDS validation: address not a valid IPv4 or IPv6 address: ' + socketAddress.address);
return false;
return null;
}
for (const address of seenAddresses) {
if (addressesEqual(socketAddress, address)) {
trace('EDS validation: duplicate address seen: ' + address);
return false;
return null;
}
}
seenAddresses.push(socketAddress);
@ -98,15 +98,15 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
for (const totalWeight of priorityTotalWeights.values()) {
if (totalWeight > UINT32_MAX) {
trace('EDS validation: total weight > UINT32_MAX')
return false;
return null;
}
}
for (const priority of priorityTotalWeights.keys()) {
if (priority > 0 && !priorityTotalWeights.has(priority - 1)) {
trace('EDS validation: priorities not contiguous');
return false;
return null;
}
}
return true;
return message;
}
}

View File

@ -22,6 +22,7 @@ import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from '../resources';
import { getTopLevelFilterUrl, validateTopLevelFilter } from '../http-filter';
import { EXPERIMENTAL_FAULT_INJECTION } from '../environment';
import { XdsServerConfig } from "../xds-bootstrap";
const TRACER_NAME = 'xds_client';
@ -31,7 +32,7 @@ function trace(text: string): void {
const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router';
export class LdsState extends BaseXdsStreamState<Listener__Output> implements XdsStreamState<Listener__Output> {
export class LdsState extends BaseXdsStreamState<Listener__Output, Listener__Output> implements XdsStreamState<Listener__Output, Listener__Output> {
protected getResourceName(resource: Listener__Output): string {
return resource.name;
}
@ -42,18 +43,18 @@ export class LdsState extends BaseXdsStreamState<Listener__Output> implements Xd
return true;
}
constructor(private rdsState: RdsState, updateResourceNames: () => void) {
super(updateResourceNames);
constructor(xdsServer: XdsServerConfig, private rdsState: RdsState, updateResourceNames: () => void) {
super(xdsServer, updateResourceNames);
}
public validateResponse(message: Listener__Output): boolean {
public validateResponse(message: Listener__Output): Listener__Output | null {
if (
!(
message.api_listener?.api_listener &&
message.api_listener.api_listener.type_url === HTTP_CONNECTION_MANGER_TYPE_URL
)
) {
return false;
return null;
}
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, message.api_listener!.api_listener.value);
if (EXPERIMENTAL_FAULT_INJECTION) {
@ -61,12 +62,12 @@ export class LdsState extends BaseXdsStreamState<Listener__Output> implements Xd
for (const [index, httpFilter] of httpConnectionManager.http_filters.entries()) {
if (filterNames.has(httpFilter.name)) {
trace('LDS response validation failed: duplicate HTTP filter name ' + httpFilter.name);
return false;
return null;
}
filterNames.add(httpFilter.name);
if (!validateTopLevelFilter(httpFilter)) {
trace('LDS response validation failed: ' + httpFilter.name + ' filter validation failed');
return false;
return null;
}
/* Validate that the last filter, and only the last filter, is the
* router filter. */
@ -74,22 +75,28 @@ export class LdsState extends BaseXdsStreamState<Listener__Output> implements Xd
if (index < httpConnectionManager.http_filters.length - 1) {
if (filterUrl === ROUTER_FILTER_URL) {
trace('LDS response validation failed: router filter is before end of list');
return false;
return null;
}
} else {
if (filterUrl !== ROUTER_FILTER_URL) {
trace('LDS response validation failed: final filter is ' + filterUrl);
return false;
return null;
}
}
}
}
switch (httpConnectionManager.route_specifier) {
case 'rds':
return !!httpConnectionManager.rds?.config_source?.ads;
if (!httpConnectionManager.rds?.config_source?.ads) {
return null;
}
return message;
case 'route_config':
return this.rdsState.validateResponse(httpConnectionManager.route_config!);
if (!this.rdsState.validateResponse(httpConnectionManager.route_config!)) {
return null;
}
return message;
}
return false;
return null;
}
}

View File

@ -41,7 +41,7 @@ function durationToMs(duration: Duration__Output | null): number | null {
return (Number.parseInt(duration.seconds) * 1000 + duration.nanos / 1_000_000) | 0;
}
export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> implements XdsStreamState<RouteConfiguration__Output> {
export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output, RouteConfiguration__Output> implements XdsStreamState<RouteConfiguration__Output, RouteConfiguration__Output> {
protected isStateOfTheWorld(): boolean {
return false;
}
@ -73,7 +73,7 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
return true;
}
validateResponse(message: RouteConfiguration__Output): boolean {
validateResponse(message: RouteConfiguration__Output) {
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
for (const virtualHost of message.virtual_hosts) {
for (const domainPattern of virtualHost.domains) {
@ -81,54 +81,54 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
const lastStarIndex = domainPattern.lastIndexOf('*');
// A domain pattern can have at most one wildcard *
if (starIndex !== lastStarIndex) {
return false;
return null;
}
// A wildcard * can either be absent or at the beginning or end of the pattern
if (!(starIndex === -1 || starIndex === 0 || starIndex === domainPattern.length - 1)) {
return false;
return null;
}
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const filterConfig of Object.values(virtualHost.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return false;
return null;
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(virtualHost.retry_policy)) {
return false;
return null;
}
}
for (const route of virtualHost.routes) {
const match = route.match;
if (!match) {
return false;
return null;
}
if (SUPPORTED_PATH_SPECIFIERS.indexOf(match.path_specifier) < 0) {
return false;
return null;
}
for (const headers of match.headers) {
if (SUPPPORTED_HEADER_MATCH_SPECIFIERS.indexOf(headers.header_match_specifier) < 0) {
return false;
return null;
}
}
if (route.action !== 'route') {
return false;
return null;
}
if ((route.route === undefined) || (route.route === null) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) {
return false;
return null;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const [name, filterConfig] of Object.entries(route.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return false;
return null;
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(route.route.retry_policy)) {
return false;
return null;
}
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
@ -137,13 +137,13 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
weightSum += clusterWeight.weight?.value ?? 0;
}
if (weightSum === 0 || weightSum > UINT32_MAX) {
return false;
return null;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
for (const weightedCluster of route.route!.weighted_clusters!.clusters) {
for (const filterConfig of Object.values(weightedCluster.typed_per_filter_config ?? {})) {
if (!validateOverrideFilter(filterConfig)) {
return false;
return null;
}
}
}
@ -151,6 +151,6 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
}
}
}
return true;
return message;
}
}

View File

@ -17,6 +17,9 @@
import { experimental, logVerbosity, Metadata, status, StatusObject } from "@grpc/grpc-js";
import { Any__Output } from "../generated/google/protobuf/Any";
import { ResourceCache } from "../resource-cache";
import { XdsServerConfig } from "../xds-bootstrap";
import { XdsResourceKey } from "../resources";
const TRACER_NAME = 'xds_client';
@ -53,7 +56,7 @@ export interface HandleResponseResult {
missing: string[];
}
export interface XdsStreamState<ResponseType> {
export interface XdsStreamState<ResponseType, UpdateType> {
versionInfo: string;
nonce: string;
getResourceNames(): string[];
@ -67,34 +70,45 @@ export interface XdsStreamState<ResponseType> {
reportStreamError(status: StatusObject): void;
reportAdsStreamStart(): void;
addWatcher(name: string, watcher: Watcher<ResponseType>): void;
removeWatcher(resourceName: string, watcher: Watcher<ResponseType>): void;
addWatcher(name: string, watcher: Watcher<UpdateType>): void;
removeWatcher(resourceName: string, watcher: Watcher<UpdateType>): void;
}
interface SubscriptionEntry<ResponseType> {
watchers: Watcher<ResponseType>[];
cachedResponse: ResponseType | null;
export interface XdsSubscriptionTracker<UpdateType> {
getResourceNames(): string[];
handleResourceUpdates(resourceList: ResourcePair<UpdateType>[]): void;
reportStreamError(status: StatusObject): void;
reportAdsStreamStart(): void;
addWatcher(key: string, watcher: Watcher<UpdateType>): void;
removeWatcher(key: string, watcher: Watcher<UpdateType>): void;
}
interface SubscriptionEntry<UpdateType> {
watchers: Watcher<UpdateType>[];
cachedResponse: UpdateType | null;
resourceTimer: NodeJS.Timer;
deletionIgnored: boolean;
}
const RESOURCE_TIMEOUT_MS = 15_000;
export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState<ResponseType> {
export abstract class BaseXdsStreamState<ResponseType, UpdateType> implements XdsStreamState<ResponseType, UpdateType> {
versionInfo = '';
nonce = '';
private subscriptions: Map<string, SubscriptionEntry<ResponseType>> = new Map<string, SubscriptionEntry<ResponseType>>();
private subscriptions: Map<string, SubscriptionEntry<UpdateType>> = new Map<string, SubscriptionEntry<UpdateType>>();
private isAdsStreamRunning = false;
private ignoreResourceDeletion = false;
constructor(private updateResourceNames: () => void) {}
constructor(protected xdsServer: XdsServerConfig, private updateResourceNames: () => void) {}
protected trace(text: string) {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, this.getProtocolName() + ' | ' + text);
}
private startResourceTimer(subscriptionEntry: SubscriptionEntry<ResponseType>) {
private startResourceTimer(subscriptionEntry: SubscriptionEntry<UpdateType>) {
clearTimeout(subscriptionEntry.resourceTimer);
subscriptionEntry.resourceTimer = setTimeout(() => {
for (const watcher of subscriptionEntry.watchers) {
@ -103,7 +117,7 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
}, RESOURCE_TIMEOUT_MS);
}
addWatcher(name: string, watcher: Watcher<ResponseType>): void {
addWatcher(name: string, watcher: Watcher<UpdateType>): void {
this.trace('Adding watcher for name ' + name);
let subscriptionEntry = this.subscriptions.get(name);
let addedName = false;
@ -134,7 +148,7 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
this.updateResourceNames();
}
}
removeWatcher(resourceName: string, watcher: Watcher<ResponseType>): void {
removeWatcher(resourceName: string, watcher: Watcher<UpdateType>): void {
this.trace('Removing watcher for name ' + resourceName);
const subscriptionEntry = this.subscriptions.get(resourceName);
if (subscriptionEntry !== undefined) {
@ -167,7 +181,8 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
const resourceName = this.getResourceName(resource);
allResourceNames.add(resourceName);
const subscriptionEntry = this.subscriptions.get(resourceName);
if (this.validateResponse(resource)) {
const update = this.validateResponse(resource);
if (update) {
result.accepted.push({
name: resourceName,
raw: raw});
@ -176,11 +191,11 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
/* Use process.nextTick to prevent errors from the watcher from
* bubbling up through here. */
process.nextTick(() => {
watcher.onValidUpdate(resource);
watcher.onValidUpdate(update);
});
}
clearTimeout(subscriptionEntry.resourceTimer);
subscriptionEntry.cachedResponse = resource;
subscriptionEntry.cachedResponse = update;
if (subscriptionEntry.deletionIgnored) {
experimental.log(logVerbosity.INFO, `Received resource with previously ignored deletion: ${resourceName}`);
subscriptionEntry.deletionIgnored = false;
@ -277,7 +292,7 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
* the RDS validateResponse.
* @param resource The resource object sent by the xDS server
*/
public abstract validateResponse(resource: ResponseType): boolean;
public abstract validateResponse(resource: ResponseType): UpdateType | null;
/**
* Get the name of a resource object. The name is some field of the object, so
* getting it depends on the specific type.
@ -285,6 +300,7 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
*/
protected abstract getResourceName(resource: ResponseType): string;
protected abstract getProtocolName(): string;
protected abstract getTypeUrl(): string;
/**
* Indicates whether responses are "state of the world", i.e. that they
* contain all resources and that omitted previously-seen resources should