diff --git a/packages/grpc-js-xds/src/environment.ts b/packages/grpc-js-xds/src/environment.ts index ede268fc..300f989e 100644 --- a/packages/grpc-js-xds/src/environment.ts +++ b/packages/grpc-js-xds/src/environment.ts @@ -26,3 +26,4 @@ export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_ export const EXPERIMENTAL_RING_HASH = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH ?? 'true') === 'true'; export const EXPERIMENTAL_PICK_FIRST = (process.env.GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG ?? 'false') === 'true'; export const EXPERIMENTAL_DUALSTACK_ENDPOINTS = (process.env.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS ?? 'true') === 'true'; +export const AGGREGATE_CLUSTER_BACKWARDS_COMPAT = (process.env.GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT ?? 'false') === 'true'; diff --git a/packages/grpc-js-xds/src/index.ts b/packages/grpc-js-xds/src/index.ts index 73546443..e080d194 100644 --- a/packages/grpc-js-xds/src/index.ts +++ b/packages/grpc-js-xds/src/index.ts @@ -17,7 +17,6 @@ import * as resolver_xds from './resolver-xds'; import * as load_balancer_cds from './load-balancer-cds'; -import * as xds_cluster_resolver from './load-balancer-xds-cluster-resolver'; import * as xds_cluster_impl from './load-balancer-xds-cluster-impl'; import * as load_balancer_priority from './load-balancer-priority'; import * as load_balancer_weighted_target from './load-balancer-weighted-target'; @@ -40,7 +39,6 @@ export { XdsServerCredentials } from './xds-credentials'; export function register() { resolver_xds.setup(); load_balancer_cds.setup(); - xds_cluster_resolver.setup(); xds_cluster_impl.setup(); load_balancer_priority.setup(); load_balancer_weighted_target.setup(); diff --git a/packages/grpc-js-xds/src/load-balancer-cds.ts b/packages/grpc-js-xds/src/load-balancer-cds.ts index a337a714..2ecb7996 100644 --- a/packages/grpc-js-xds/src/load-balancer-cds.ts +++ b/packages/grpc-js-xds/src/load-balancer-cds.ts @@ -16,8 +16,6 @@ */ import { connectivityState, status, Metadata, logVerbosity, experimental, LoadBalancingConfig, ChannelOptions, ChannelCredentials } from '@grpc/grpc-js'; -import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client'; -import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster'; import Endpoint = experimental.Endpoint; import UnavailablePicker = experimental.UnavailablePicker; import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; @@ -25,10 +23,11 @@ import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; import registerLoadBalancerType = experimental.registerLoadBalancerType; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; -import QueuePicker = experimental.QueuePicker; import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig; -import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler } from './load-balancer-xds-cluster-resolver'; -import { CdsUpdate, ClusterResourceType } from './xds-resource-type/cluster-resource-type'; +import { XdsConfig } from './xds-dependency-manager'; +import { LocalityEndpoint, PriorityChildRaw } from './load-balancer-priority'; +import { Locality__Output } from './generated/envoy/config/core/v3/Locality'; +import { AGGREGATE_CLUSTER_BACKWARDS_COMPAT, EXPERIMENTAL_OUTLIER_DETECTION } from './environment'; const TRACER_NAME = 'cds_balancer'; @@ -65,189 +64,45 @@ class CdsLoadBalancingConfig implements TypedLoadBalancingConfig { } } -interface ClusterEntry { - watcher: Watcher; - latestUpdate?: CdsUpdate; - children: string[]; -} - -interface ClusterTree { - [name: string]: ClusterEntry; -} - -function isClusterTreeFullyUpdated(tree: ClusterTree, root: string): boolean { - const toCheck: string[] = [root]; - const visited = new Set(); - while (toCheck.length > 0) { - const next = toCheck.shift()!; - if (visited.has(next)) { - continue; - } - visited.add(next); - if (!tree[next] || !tree[next].latestUpdate) { - return false; - } - toCheck.push(...tree[next].children); - } - return true; -} - -function generateDiscoverymechanismForCdsUpdate(config: CdsUpdate): DiscoveryMechanism { - if (config.type === 'AGGREGATE') { - throw new Error('Cannot generate DiscoveryMechanism for AGGREGATE cluster'); - } - return { - cluster: config.name, - lrs_load_reporting_server: config.lrsLoadReportingServer, - max_concurrent_requests: config.maxConcurrentRequests, - type: config.type, - eds_service_name: config.edsServiceName, - dns_hostname: config.dnsHostname, - outlier_detection: config.outlierDetectionUpdate - }; -} const RECURSION_DEPTH_LIMIT = 15; -/** - * Prerequisite: isClusterTreeFullyUpdated(tree, root) - * @param tree - * @param root - */ -function getDiscoveryMechanismList(tree: ClusterTree, root: string): DiscoveryMechanism[] { - const visited = new Set(); - function getDiscoveryMechanismListHelper(node: string, depth: number): DiscoveryMechanism[] { - if (depth > RECURSION_DEPTH_LIMIT) { - throw new Error('aggregate cluster graph exceeds max depth'); - } - if (visited.has(node)) { - return []; - } - visited.add(node); - if (tree[node].children.length > 0) { - trace('Visit ' + node + ' children: [' + tree[node].children + ']'); - // Aggregate cluster - const result = []; - for (const child of tree[node].children) { - result.push(...getDiscoveryMechanismListHelper(child, depth + 1)); - } - return result; - } else { - trace('Visit leaf ' + node); - // individual cluster - const config = tree[node].latestUpdate!; - return [generateDiscoverymechanismForCdsUpdate(config)]; - } +function getLeafClusters(xdsConfig: XdsConfig, rootCluster: string, depth = 0): string[] { + if (depth > RECURSION_DEPTH_LIMIT) { + throw new Error(`aggregate cluster graph exceeds max depth of ${RECURSION_DEPTH_LIMIT}`); } - return getDiscoveryMechanismListHelper(root, 0); + const maybeClusterConfig = xdsConfig.clusters.get(rootCluster); + if (!maybeClusterConfig) { + return []; + } + if (!maybeClusterConfig.success) { + return [rootCluster]; + } + if (maybeClusterConfig.value.children.type === 'aggregate') { + return ([] as string[]).concat(...maybeClusterConfig.value.children.leafClusters.map(childCluster => getLeafClusters(xdsConfig, childCluster, depth + 1))) + } else { + return [rootCluster]; + } +} + +export function localityToName(locality: Locality__Output) { + return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`; } export class CdsLoadBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; - private latestCdsUpdate: Cluster__Output | null = null; - private latestConfig: CdsLoadBalancingConfig | null = null; - private latestAttributes: { [key: string]: unknown } = {}; - private xdsClient: XdsClient | null = null; - - private clusterTree: ClusterTree = {}; - - private updatedChild = false; + private localityPriorities: Map = new Map(); + private priorityNames: string[] = []; + private nextPriorityChildNumber = 0; constructor(private readonly channelControlHelper: ChannelControlHelper, credentials: ChannelCredentials, options: ChannelOptions) { - this.childBalancer = new XdsClusterResolverChildPolicyHandler(channelControlHelper, credentials, options); + this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper, credentials, options); } - private reportError(errorMessage: string) { - trace('CDS cluster reporting error ' + errorMessage); - this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage, metadata: new Metadata()})); - } - - private addCluster(cluster: string) { - if (cluster in this.clusterTree) { - return; - } - trace('Adding watcher for cluster ' + cluster); - const watcher: Watcher = new Watcher({ - onResourceChanged: (update) => { - this.clusterTree[cluster].latestUpdate = update; - if (update.type === 'AGGREGATE') { - const children = update.aggregateChildren - trace('Received update for aggregate cluster ' + cluster + ' with children [' + children + ']'); - this.clusterTree[cluster].children = children; - children.forEach(child => this.addCluster(child)); - } - if (isClusterTreeFullyUpdated(this.clusterTree, this.latestConfig!.getCluster())) { - let discoveryMechanismList: DiscoveryMechanism[]; - try { - discoveryMechanismList = getDiscoveryMechanismList(this.clusterTree, this.latestConfig!.getCluster()); - } catch (e) { - this.reportError((e as Error).message); - return; - } - const rootClusterUpdate = this.clusterTree[this.latestConfig!.getCluster()].latestUpdate!; - const clusterResolverConfig: LoadBalancingConfig = { - xds_cluster_resolver: { - discovery_mechanisms: discoveryMechanismList, - xds_lb_policy: rootClusterUpdate.lbPolicyConfig - } - }; - let parsedClusterResolverConfig: TypedLoadBalancingConfig; - try { - parsedClusterResolverConfig = parseLoadBalancingConfig(clusterResolverConfig); - } catch (e) { - this.reportError(`CDS cluster ${this.latestConfig?.getCluster()} child config parsing failed with error ${(e as Error).message}`); - return; - } - trace('Child update config: ' + JSON.stringify(clusterResolverConfig)); - this.updatedChild = true; - this.childBalancer.updateAddressList( - [], - parsedClusterResolverConfig, - this.latestAttributes - ); - } - }, - onResourceDoesNotExist: () => { - trace('Received onResourceDoesNotExist update for cluster ' + cluster); - if (cluster in this.clusterTree) { - this.clusterTree[cluster].latestUpdate = undefined; - this.clusterTree[cluster].children = []; - } - this.reportError(`CDS resource ${cluster} does not exist`); - this.childBalancer.destroy(); - }, - onError: (statusObj) => { - if (!this.updatedChild) { - trace('Transitioning to transient failure due to onError update for cluster' + cluster); - this.reportError(`xDS request failed with error ${statusObj.details}`); - } - } - }); - this.clusterTree[cluster] = { - watcher: watcher, - children: [] - }; - if (this.xdsClient) { - ClusterResourceType.startWatch(this.xdsClient, cluster, watcher); - } - } - - private removeCluster(cluster: string) { - if (!(cluster in this.clusterTree)) { - return; - } - if (this.xdsClient) { - ClusterResourceType.cancelWatch(this.xdsClient, cluster, this.clusterTree[cluster].watcher); - } - delete this.clusterTree[cluster]; - } - - private clearClusterTree() { - for (const cluster of Object.keys(this.clusterTree)) { - this.removeCluster(cluster); - } + private getNextPriorityName(cluster: string) { + return `cluster=${cluster}, child_number=${this.nextPriorityChildNumber++}`; } updateAddressList( @@ -260,26 +115,170 @@ export class CdsLoadBalancer implements LoadBalancer { return; } trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2)); - this.latestAttributes = attributes; - this.xdsClient = attributes.xdsClient as XdsClient; - - /* If the cluster is changing, disable the old watcher before adding the new - * one */ - if ( - this.latestConfig && this.latestConfig.getCluster() !== lbConfig.getCluster() - ) { - trace('Removing old cluster watchers rooted at ' + this.latestConfig.getCluster()); - this.clearClusterTree(); - this.updatedChild = false; + const xdsConfig = attributes.xdsConfig as XdsConfig; + const clusterName = lbConfig.getCluster(); + const maybeClusterConfig = xdsConfig.clusters.get(clusterName); + if (!maybeClusterConfig) { + trace('Received update with no config for cluster ' + clusterName); + return; } - - if (!this.latestConfig) { - this.channelControlHelper.updateState(connectivityState.CONNECTING, new QueuePicker(this)); + if (!maybeClusterConfig.success) { + this.childBalancer.destroy(); + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error)); + return; } + const clusterConfig = maybeClusterConfig.value; - this.latestConfig = lbConfig; + if (clusterConfig.children.type === 'aggregate') { + let leafClusters: string[]; + try { + leafClusters = getLeafClusters(xdsConfig, clusterName); + } catch (e) { + trace('xDS config parsing failed with error ' + (e as Error).message); + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `xDS config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()})); + return; + } + const priorityChildren: {[name: string]: PriorityChildRaw} = {}; + for (const cluster of leafClusters) { + priorityChildren[cluster] = { + config: [{ + cds: { + cluster: cluster + } + }], + ignore_reresolution_requests: false + }; + } + const childConfig = { + priority: { + children: priorityChildren, + priorities: leafClusters + } + }; + let typedChildConfig: TypedLoadBalancingConfig; + try { + typedChildConfig = parseLoadBalancingConfig(childConfig); + } catch (e) { + trace('LB policy config parsing failed with error ' + (e as Error).message); + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `LB policy config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()})); + return; + } + this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...attributes, rootCluster: clusterName}); + } else { + if (!clusterConfig.children.endpoints) { + trace('Received update with no resolved endpoints for cluster ' + clusterName); + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `Cluster ${clusterName} resolution failed: ${clusterConfig.children.resolutionNote}`})); + return; + } + const newPriorityNames: string[] = []; + const newLocalityPriorities = new Map(); + const priorityChildren: {[name: string]: PriorityChildRaw} = {}; + const childEndpointList: LocalityEndpoint[] = []; + let endpointPickingPolicy: LoadBalancingConfig[]; + if (clusterConfig.cluster.type === 'EDS') { + endpointPickingPolicy = clusterConfig.cluster.lbPolicyConfig; + if (AGGREGATE_CLUSTER_BACKWARDS_COMPAT) { + if (typeof attributes.rootCluster === 'string') { + const maybeRootClusterConfig = xdsConfig.clusters.get(attributes.rootCluster); + if (maybeRootClusterConfig?.success) { + endpointPickingPolicy = maybeRootClusterConfig.value.cluster.lbPolicyConfig; + } + } + } + } else { + endpointPickingPolicy = [{ pick_first: {} }]; + } + for (const [priority, priorityEntry] of clusterConfig.children.endpoints.priorities.entries()) { + /** + * Highest (smallest number) priority value that any of the localities in + * this locality array had a in the previous mapping. + */ + let highestOldPriority = Infinity; + for (const localityObj of priorityEntry.localities) { + const oldPriority = this.localityPriorities.get( + localityToName(localityObj.locality) + ); + if ( + oldPriority !== undefined && + oldPriority >= priority && + oldPriority < highestOldPriority + ) { + highestOldPriority = oldPriority; + } + } + let newPriorityName: string; + if (highestOldPriority === Infinity) { + /* No existing priority at or below the same number as the priority we + * are looking at had any of the localities in this priority. So, we + * use a new name. */ + newPriorityName = this.getNextPriorityName(clusterName); + } else { + const newName = this.priorityNames[highestOldPriority]; + if (newPriorityNames.indexOf(newName) < 0) { + newPriorityName = newName; + } else { + newPriorityName = this.getNextPriorityName(clusterName); + } + } + newPriorityNames[priority] = newPriorityName; - this.addCluster(lbConfig.getCluster()); + for (const localityObj of priorityEntry.localities) { + for (const weightedEndpoint of localityObj.endpoints) { + childEndpointList.push({ + localityPath: [ + newPriorityName, + localityToName(localityObj.locality), + ], + locality: localityObj.locality, + localityWeight: localityObj.weight, + endpointWeight: localityObj.weight * weightedEndpoint.weight, + ...weightedEndpoint.endpoint + }); + } + newLocalityPriorities.set(localityToName(localityObj.locality), priority); + } + + priorityChildren[newPriorityName] = { + config: endpointPickingPolicy, + ignore_reresolution_requests: clusterConfig.cluster.type === 'EDS' + }; + } + this.localityPriorities = newLocalityPriorities; + this.priorityNames = newPriorityNames; + const xdsClusterImplConfig = { + xds_cluster_impl: { + cluster: clusterName, + child_policy: [{ + priority: { + children: priorityChildren, + priorities: newPriorityNames + } + }] + } + }; + let childConfig: LoadBalancingConfig; + if (EXPERIMENTAL_OUTLIER_DETECTION) { + childConfig = { + outlier_detection: { + ...clusterConfig.cluster.outlierDetectionUpdate, + child_policy: [xdsClusterImplConfig] + } + } + } else { + childConfig = xdsClusterImplConfig; + } + trace(JSON.stringify(childConfig, undefined, 2)); + let typedChildConfig: TypedLoadBalancingConfig; + try { + typedChildConfig = parseLoadBalancingConfig(childConfig); + } catch (e) { + trace('LB policy config parsing failed with error ' + (e as Error).message); + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `LB policy config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()})); + return; + } + trace(JSON.stringify(typedChildConfig.toJsonObject(), undefined, 2)); + this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, attributes); + } } exitIdle(): void { this.childBalancer.exitIdle(); @@ -290,7 +289,6 @@ export class CdsLoadBalancer implements LoadBalancer { destroy(): void { trace('Destroying load balancer rooted at cluster named ' + this.latestConfig?.getCluster()); this.childBalancer.destroy(); - this.clearClusterTree(); } getTypeName(): string { return TYPE_NAME; diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts index 0538cdbe..f291252d 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts @@ -36,7 +36,10 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import selectLbConfigFromList = experimental.selectLbConfigFromList; import SubchannelInterface = experimental.SubchannelInterface; import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper; +import UnavailablePicker = experimental.UnavailablePicker; import { Locality__Output } from "./generated/envoy/config/core/v3/Locality"; +import { ClusterConfig, XdsConfig } from "./xds-dependency-manager"; +import { CdsUpdate } from "./xds-resource-type/cluster-resource-type"; const TRACER_NAME = 'xds_cluster_impl'; @@ -53,59 +56,26 @@ export interface DropCategory { requests_per_million: number; } -function validateDropCategory(obj: any): DropCategory { - if (!('category' in obj && typeof obj.category === 'string')) { - throw new Error('xds_cluster_impl config drop_categories entry must have a string field category'); - } - if (!('requests_per_million' in obj && typeof obj.requests_per_million === 'number')) { - throw new Error('xds_cluster_impl config drop_categories entry must have a number field requests_per_million'); - } - return obj; -} - class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig { - private maxConcurrentRequests: number; getLoadBalancerName(): string { return TYPE_NAME; } toJsonObject(): object { const jsonObj: {[key: string]: any} = { cluster: this.cluster, - drop_categories: this.dropCategories, child_policy: [this.childPolicy.toJsonObject()], - max_concurrent_requests: this.maxConcurrentRequests, - eds_service_name: this.edsServiceName, - lrs_load_reporting_server: this.lrsLoadReportingServer, }; return { [TYPE_NAME]: jsonObj }; } - constructor(private cluster: string, private dropCategories: DropCategory[], private childPolicy: TypedLoadBalancingConfig, private edsServiceName: string, private lrsLoadReportingServer?: XdsServerConfig, maxConcurrentRequests?: number) { - this.maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS; - } + constructor(private cluster: string, private childPolicy: TypedLoadBalancingConfig) {} getCluster() { return this.cluster; } - getEdsServiceName() { - return this.edsServiceName; - } - - getLrsLoadReportingServer() { - return this.lrsLoadReportingServer; - } - - getMaxConcurrentRequests() { - return this.maxConcurrentRequests; - } - - getDropCategories() { - return this.dropCategories; - } - getChildPolicy() { return this.childPolicy; } @@ -114,15 +84,6 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig { if (!('cluster' in obj && typeof obj.cluster === 'string')) { throw new Error('xds_cluster_impl config must have a string field cluster'); } - if (!('eds_service_name' in obj && typeof obj.eds_service_name === 'string')) { - throw new Error('xds_cluster_impl config must have a string field eds_service_name'); - } - if ('max_concurrent_requests' in obj && !(obj.max_concurrent_requests === undefined || typeof obj.max_concurrent_requests === 'number')) { - throw new Error('xds_cluster_impl config max_concurrent_requests must be a number if provided'); - } - if (!('drop_categories' in obj && Array.isArray(obj.drop_categories))) { - throw new Error('xds_cluster_impl config must have an array field drop_categories'); - } if (!('child_policy' in obj && Array.isArray(obj.child_policy))) { throw new Error('xds_cluster_impl config must have an array field child_policy'); } @@ -130,11 +91,7 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig { if (!childConfig) { throw new Error('xds_cluster_impl config child_policy parsing failed'); } - let lrsServer: XdsServerConfig | undefined = undefined; - if (obj.lrs_load_reporting_server) { - lrsServer = validateXdsServerConfig(obj.lrs_load_reporting_server) - } - return new XdsClusterImplLoadBalancingConfig(obj.cluster, obj.drop_categories.map(validateDropCategory), childConfig, obj.eds_service_name, lrsServer, obj.max_concurrent_requests); + return new XdsClusterImplLoadBalancingConfig(obj.cluster, childConfig); } } @@ -252,11 +209,12 @@ class XdsClusterImplBalancer implements LoadBalancer { private latestConfig: XdsClusterImplLoadBalancingConfig | null = null; private clusterDropStats: XdsClusterDropStats | null = null; private xdsClient: XdsClient | null = null; + private latestClusterConfig: ClusterConfig | null = null; constructor(private readonly channelControlHelper: ChannelControlHelper, credentials: ChannelCredentials, options: ChannelOptions) { this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, { createSubchannel: (subchannelAddress, subchannelArgs, credentialsOverride) => { - if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList) { + if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) { throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated'); } const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs, credentialsOverride); @@ -270,23 +228,23 @@ class XdsClusterImplBalancer implements LoadBalancer { trace('Not reporting load for address ' + subchannelAddressToString(subchannelAddress) + ' because it has unknown locality.'); return wrapperChild; } - const lrsServer = this.latestConfig.getLrsLoadReportingServer(); + const lrsServer = this.latestClusterConfig.cluster.lrsLoadReportingServer; let statsObj: XdsClusterLocalityStats | null = null; if (lrsServer) { statsObj = this.xdsClient.addClusterLocalityStats( lrsServer, this.latestConfig.getCluster(), - this.latestConfig.getEdsServiceName(), + this.latestClusterConfig.cluster.edsServiceName ?? '', locality ); } return new LocalitySubchannelWrapper(wrapperChild, statsObj); }, updateState: (connectivityState, originalPicker) => { - if (this.latestConfig === null) { + if (this.latestConfig === null || this.latestClusterConfig === null || this.latestClusterConfig.children.type === 'aggregate' || !this.latestClusterConfig.children.endpoints) { channelControlHelper.updateState(connectivityState, originalPicker); } else { - const picker = new XdsClusterImplPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestConfig.getEdsServiceName()), this.latestConfig.getMaxConcurrentRequests(), this.latestConfig.getDropCategories(), this.clusterDropStats); + const picker = new XdsClusterImplPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestClusterConfig.cluster.edsServiceName), this.latestClusterConfig.cluster.maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS, this.latestClusterConfig.children.endpoints.dropCategories, this.clusterDropStats); channelControlHelper.updateState(connectivityState, picker); } } @@ -297,15 +255,38 @@ class XdsClusterImplBalancer implements LoadBalancer { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); return; } - trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2)); + trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); + const xdsConfig = attributes.xdsConfig as XdsConfig; + const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster()); + if (!maybeClusterConfig) { + trace('Received update with no config for cluster ' + lbConfig.getCluster()); + return; + } + if (!maybeClusterConfig.success) { + this.latestClusterConfig = null; + this.childBalancer.destroy(); + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error)); + return; + } + const clusterConfig = maybeClusterConfig.value; + if (clusterConfig.children.type === 'aggregate') { + trace('Received update for aggregate cluster ' + lbConfig.getCluster()); + return; + } + if (!clusterConfig.children.endpoints) { + this.childBalancer.destroy(); + this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({details: clusterConfig.children.resolutionNote})); + + } this.lastestEndpointList = endpointList; this.latestConfig = lbConfig; + this.latestClusterConfig = clusterConfig; this.xdsClient = attributes.xdsClient as XdsClient; - if (lbConfig.getLrsLoadReportingServer()) { + if (clusterConfig.cluster.lrsLoadReportingServer) { this.clusterDropStats = this.xdsClient.addClusterDropStats( - lbConfig.getLrsLoadReportingServer()!, + clusterConfig.cluster.lrsLoadReportingServer, lbConfig.getCluster(), - lbConfig.getEdsServiceName() ?? '' + clusterConfig.cluster.edsServiceName ?? '' ); } diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts deleted file mode 100644 index 0f87eee6..00000000 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts +++ /dev/null @@ -1,508 +0,0 @@ -/* - * Copyright 2023 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { ChannelCredentials, ChannelOptions, LoadBalancingConfig, Metadata, connectivityState, experimental, logVerbosity, status } from "@grpc/grpc-js"; -import { registerLoadBalancerType } from "@grpc/grpc-js/build/src/load-balancer"; -import { EXPERIMENTAL_DUALSTACK_ENDPOINTS, EXPERIMENTAL_OUTLIER_DETECTION } from "./environment"; -import { Locality__Output } from "./generated/envoy/config/core/v3/Locality"; -import { ClusterLoadAssignment__Output } from "./generated/envoy/config/endpoint/v3/ClusterLoadAssignment"; -import { LocalityEndpoint, PriorityChildRaw } from "./load-balancer-priority"; -import { getSingletonXdsClient, Watcher, XdsClient } from "./xds-client"; -import { DropCategory } from "./load-balancer-xds-cluster-impl"; - -import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; -import LoadBalancer = experimental.LoadBalancer; -import Resolver = experimental.Resolver; -import SubchannelAddress = experimental.SubchannelAddress; -import Endpoint = experimental.Endpoint; -import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; -import createResolver = experimental.createResolver; -import ChannelControlHelper = experimental.ChannelControlHelper; -import OutlierDetectionRawConfig = experimental.OutlierDetectionRawConfig; -import subchannelAddressToString = experimental.subchannelAddressToString; -import endpointToString = experimental.endpointToString; -import selectLbConfigFromList = experimental.selectLbConfigFromList; -import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig; -import UnavailablePicker = experimental.UnavailablePicker; -import { serverConfigEqual, validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap"; -import { EndpointResourceType } from "./xds-resource-type/endpoint-resource-type"; -import { SocketAddress__Output } from "./generated/envoy/config/core/v3/SocketAddress"; - -const TRACER_NAME = 'xds_cluster_resolver'; - -function trace(text: string): void { - experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); -} - -export interface DiscoveryMechanism { - cluster: string; - lrs_load_reporting_server?: XdsServerConfig; - max_concurrent_requests?: number; - type: 'EDS' | 'LOGICAL_DNS'; - eds_service_name?: string; - dns_hostname?: string; - outlier_detection?: OutlierDetectionRawConfig; -} - -function validateDiscoveryMechanism(obj: any): DiscoveryMechanism { - if (!('cluster' in obj && typeof obj.cluster === 'string')) { - throw new Error('discovery_mechanisms entry must have a string field cluster'); - } - if (!('type' in obj && (obj.type === 'EDS' || obj.type === 'LOGICAL_DNS'))) { - throw new Error('discovery_mechanisms entry must have a field "type" with the value "EDS" or "LOGICAL_DNS"'); - } - if ('max_concurrent_requests' in obj && obj.max_concurrent_requests !== undefined && typeof obj.max_concurrent_requests !== "number") { - throw new Error('discovery_mechanisms entry max_concurrent_requests field must be a number if provided'); - } - if ('eds_service_name' in obj && obj.eds_service_name !== undefined && typeof obj.eds_service_name !== 'string') { - throw new Error('discovery_mechanisms entry eds_service_name field must be a string if provided'); - } - if ('dns_hostname' in obj && obj.dns_hostname !== undefined && typeof obj.dns_hostname !== 'string') { - throw new Error('discovery_mechanisms entry dns_hostname field must be a string if provided'); - } - return {...obj, lrs_load_reporting_server: obj.lrs_load_reporting_server ? validateXdsServerConfig(obj.lrs_load_reporting_server) : undefined}; -} - -const TYPE_NAME = 'xds_cluster_resolver'; - -class XdsClusterResolverLoadBalancingConfig implements TypedLoadBalancingConfig { - getLoadBalancerName(): string { - return TYPE_NAME; - } - toJsonObject(): object { - return { - [TYPE_NAME]: { - discovery_mechanisms: this.discoveryMechanisms, - xds_lb_policy: this.xdsLbPolicy - } - } - } - - constructor(private discoveryMechanisms: DiscoveryMechanism[], private xdsLbPolicy: LoadBalancingConfig[]) {} - - getDiscoveryMechanisms() { - return this.discoveryMechanisms; - } - - getXdsLbPolicy() { - return this.xdsLbPolicy; - } - - static createFromJson(obj: any): XdsClusterResolverLoadBalancingConfig { - if (!('discovery_mechanisms' in obj && Array.isArray(obj.discovery_mechanisms))) { - throw new Error('xds_cluster_resolver config must have a discovery_mechanisms array'); - } - if (!('xds_lb_policy' in obj && Array.isArray(obj.xds_lb_policy))) { - throw new Error('xds_cluster_resolver config must have a xds_lb_policy array'); - } - return new XdsClusterResolverLoadBalancingConfig( - obj.discovery_mechanisms.map(validateDiscoveryMechanism), - obj.xds_lb_policy - ); - } -} - -interface WeightedEndpoint { - endpoint: Endpoint; - weight: number; -} - -interface LocalityEntry { - locality: Locality__Output; - weight: number; - endpoints: WeightedEndpoint[]; -} - -interface PriorityEntry { - localities: LocalityEntry[]; - dropCategories: DropCategory[]; -} - -interface DiscoveryMechanismEntry { - discoveryMechanism: DiscoveryMechanism; - localityPriorities: Map; - priorityNames: string[]; - nextPriorityChildNumber: number; - watcher?: Watcher; - resolver?: Resolver; - latestUpdate?: PriorityEntry[]; -} - -function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEntry[] { - const result: PriorityEntry[] = []; - const dropCategories: DropCategory[] = []; - if (edsUpdate.policy) { - for (const dropOverload of edsUpdate.policy.drop_overloads) { - if (!dropOverload.drop_percentage) { - continue; - } - let requestsPerMillion: number; - switch (dropOverload.drop_percentage.denominator) { - case 'HUNDRED': - requestsPerMillion = dropOverload.drop_percentage.numerator * 10_000; - break; - case 'TEN_THOUSAND': - requestsPerMillion = dropOverload.drop_percentage.numerator * 100; - break; - case 'MILLION': - requestsPerMillion = dropOverload.drop_percentage.numerator; - break; - } - dropCategories.push({ - category: dropOverload.category, - requests_per_million: requestsPerMillion - }); - } - } - for (const endpoint of edsUpdate.endpoints) { - if (!endpoint.load_balancing_weight) { - continue; - } - const endpoints: WeightedEndpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map( - (lbEndpoint) => { - /* The validator in the XdsClient class ensures that each endpoint has - * a socket_address with an IP address and a port_value. */ - let socketAddresses: SocketAddress__Output[]; - if (EXPERIMENTAL_DUALSTACK_ENDPOINTS) { - socketAddresses = [ - lbEndpoint.endpoint!.address!.socket_address!, - ...lbEndpoint.endpoint!.additional_addresses.map(additionalAddress => additionalAddress.address!.socket_address!) - ]; - } else { - socketAddresses = [lbEndpoint.endpoint!.address!.socket_address!]; - } - return { - endpoint: { - addresses: socketAddresses.map(socketAddress => ({ - host: socketAddress.address!, - port: socketAddress.port_value! - })) - }, - weight: lbEndpoint.load_balancing_weight?.value ?? 1 - }; - } - ); - if (endpoints.length === 0) { - continue; - } - let priorityEntry: PriorityEntry; - if (result[endpoint.priority]) { - priorityEntry = result[endpoint.priority]; - } else { - priorityEntry = { - localities: [], - dropCategories: dropCategories - }; - result[endpoint.priority] = priorityEntry; - } - priorityEntry.localities.push({ - locality: endpoint.locality!, - endpoints: endpoints, - weight: endpoint.load_balancing_weight.value - }); - } - // Collapse spaces in sparse array - return result.filter(priority => priority); -} - -function getDnsPriorities(endpoints: Endpoint[]): PriorityEntry[] { - return [{ - localities: [{ - locality: { - region: '', - zone: '', - sub_zone: '' - }, - weight: 1, - endpoints: endpoints.map(endpoint => ({endpoint: endpoint, weight: 1})) - }], - dropCategories: [] - }]; -} - -export function localityToName(locality: Locality__Output) { - return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`; -} - -function getNextPriorityName(entry: DiscoveryMechanismEntry): string { - return `cluster=${entry.discoveryMechanism.cluster}, child_number=${entry.nextPriorityChildNumber++}`; -} - -export class XdsClusterResolver implements LoadBalancer { - private discoveryMechanismList: DiscoveryMechanismEntry[] = []; - private latestConfig: XdsClusterResolverLoadBalancingConfig | null = null; - private latestAttributes: { [key: string]: unknown; } = {}; - private xdsClient: XdsClient | null = null; - private childBalancer: ChildLoadBalancerHandler; - - constructor(private readonly channelControlHelper: ChannelControlHelper, credentials: ChannelCredentials, options: ChannelOptions) { - this.childBalancer = new ChildLoadBalancerHandler(experimental.createChildChannelControlHelper(channelControlHelper, { - requestReresolution: () => { - for (const entry of this.discoveryMechanismList) { - entry.resolver?.updateResolution(); - } - } - }), credentials, options); - } - - private maybeUpdateChild() { - if (!this.latestConfig) { - return; - } - for (const entry of this.discoveryMechanismList) { - if (!entry.latestUpdate) { - return; - } - } - const fullPriorityList: string[] = []; - const priorityChildren: {[name: string]: PriorityChildRaw} = {}; - const endpointList: LocalityEndpoint[] = []; - const edsChildPolicy = this.latestConfig.getXdsLbPolicy(); - for (const entry of this.discoveryMechanismList) { - const newPriorityNames: string[] = []; - const newLocalityPriorities = new Map(); - const xdsClusterImplChildPolicy: LoadBalancingConfig[] = entry.discoveryMechanism.type === 'EDS' ? edsChildPolicy : [{ pick_first: {} }]; - - for (const [priority, priorityEntry] of entry.latestUpdate!.entries()) { - /** - * Highest (smallest number) priority value that any of the localities in - * this locality array had a in the previous mapping. - */ - let highestOldPriority = Infinity; - for (const localityObj of priorityEntry.localities) { - const oldPriority = entry.localityPriorities.get( - localityToName(localityObj.locality) - ); - if ( - oldPriority !== undefined && - oldPriority >= priority && - oldPriority < highestOldPriority - ) { - highestOldPriority = oldPriority; - } - } - let newPriorityName: string; - if (highestOldPriority === Infinity) { - /* No existing priority at or below the same number as the priority we - * are looking at had any of the localities in this priority. So, we - * use a new name. */ - newPriorityName = getNextPriorityName(entry); - } else { - const newName = entry.priorityNames[highestOldPriority]; - if (newPriorityNames.indexOf(newName) < 0) { - newPriorityName = newName; - } else { - newPriorityName = getNextPriorityName(entry); - } - } - newPriorityNames[priority] = newPriorityName; - - for (const localityObj of priorityEntry.localities) { - for (const weightedEndpoint of localityObj.endpoints) { - endpointList.push({ - localityPath: [ - newPriorityName, - localityToName(localityObj.locality), - ], - locality: localityObj.locality, - localityWeight: localityObj.weight, - endpointWeight: localityObj.weight * weightedEndpoint.weight, - ...weightedEndpoint.endpoint - }); - } - newLocalityPriorities.set(localityToName(localityObj.locality), priority); - } - const xdsClusterImplConfig = { - xds_cluster_impl: { - cluster: entry.discoveryMechanism.cluster, - drop_categories: priorityEntry.dropCategories, - max_concurrent_requests: entry.discoveryMechanism.max_concurrent_requests, - eds_service_name: entry.discoveryMechanism.eds_service_name ?? '', - lrs_load_reporting_server: entry.discoveryMechanism.lrs_load_reporting_server, - child_policy: xdsClusterImplChildPolicy - } - } - let priorityChildConfig: LoadBalancingConfig; - if (EXPERIMENTAL_OUTLIER_DETECTION) { - priorityChildConfig = { - outlier_detection: { - ...entry.discoveryMechanism.outlier_detection, - child_policy: [xdsClusterImplConfig] - } - } - } else { - priorityChildConfig = xdsClusterImplConfig; - } - - priorityChildren[newPriorityName] = { - config: [priorityChildConfig], - ignore_reresolution_requests: entry.discoveryMechanism.type === 'EDS' - }; - } - entry.localityPriorities = newLocalityPriorities; - entry.priorityNames = newPriorityNames; - fullPriorityList.push(...newPriorityNames); - } - const childConfig = { - priority: { - children: priorityChildren, - priorities: fullPriorityList - } - } - let typedChildConfig: TypedLoadBalancingConfig; - try { - typedChildConfig = parseLoadBalancingConfig(childConfig); - } catch (e) { - trace('LB policy config parsing failed with error ' + (e as Error).message); - this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `LB policy config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()})); - return; - } - trace('Child update addresses: ' + endpointList.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')')); - trace('Child update priority config: ' + JSON.stringify(childConfig, undefined, 2)); - this.childBalancer.updateAddressList( - endpointList, - typedChildConfig, - this.latestAttributes - ); - } - - updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { - if (!(lbConfig instanceof XdsClusterResolverLoadBalancingConfig)) { - trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2)); - return; - } - trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2)); - this.latestConfig = lbConfig; - this.latestAttributes = attributes; - this.xdsClient = attributes.xdsClient as XdsClient; - if (this.discoveryMechanismList.length === 0) { - for (const mechanism of lbConfig.getDiscoveryMechanisms()) { - const mechanismEntry: DiscoveryMechanismEntry = { - discoveryMechanism: mechanism, - localityPriorities: new Map(), - priorityNames: [], - nextPriorityChildNumber: 0 - }; - if (mechanism.type === 'EDS') { - const edsServiceName = mechanism.eds_service_name ?? mechanism.cluster; - const watcher: Watcher = new Watcher({ - onResourceChanged: update => { - mechanismEntry.latestUpdate = getEdsPriorities(update); - this.maybeUpdateChild(); - }, - onResourceDoesNotExist: () => { - trace('Resource does not exist: ' + edsServiceName); - mechanismEntry.latestUpdate = [{localities: [], dropCategories: []}]; - }, - onError: error => { - if (!mechanismEntry.latestUpdate) { - trace('xDS request failed with error ' + error); - mechanismEntry.latestUpdate = [{localities: [], dropCategories: []}]; - } - } - }); - mechanismEntry.watcher = watcher; - if (this.xdsClient) { - EndpointResourceType.startWatch(this.xdsClient, edsServiceName, watcher); - } - } else { - const resolver = createResolver({scheme: 'dns', path: mechanism.dns_hostname!}, { - onSuccessfulResolution: endpointList => { - mechanismEntry.latestUpdate = getDnsPriorities(endpointList); - this.maybeUpdateChild(); - }, - onError: error => { - if (!mechanismEntry.latestUpdate) { - trace('DNS resolution for ' + mechanism.dns_hostname + ' failed with error ' + error); - mechanismEntry.latestUpdate = [{localities: [], dropCategories: []}]; - } - } - }, {'grpc.service_config_disable_resolution': 1}); - mechanismEntry.resolver = resolver; - resolver.updateResolution(); - } - this.discoveryMechanismList.push(mechanismEntry); - } - } else { - /* The ChildLoadBalancerHandler subclass guarantees that each discovery - * mechanism in the new update corresponds to the same entry in the - * existing discoveryMechanismList, and that any differences will not - * result in changes to the watcher/resolver. */ - for (let i = 0; i < this.discoveryMechanismList.length; i++) { - this.discoveryMechanismList[i].discoveryMechanism = lbConfig.getDiscoveryMechanisms()[i]; - } - this.maybeUpdateChild(); - } - } - exitIdle(): void { - this.childBalancer.exitIdle(); - } - resetBackoff(): void { - this.childBalancer.resetBackoff(); - } - destroy(): void { - for (const mechanismEntry of this.discoveryMechanismList) { - if (mechanismEntry.watcher) { - const edsServiceName = mechanismEntry.discoveryMechanism.eds_service_name ?? mechanismEntry.discoveryMechanism.cluster; - if (this.xdsClient) { - EndpointResourceType.cancelWatch(this.xdsClient, edsServiceName, mechanismEntry.watcher); - } - } - mechanismEntry.resolver?.destroy(); - } - this.discoveryMechanismList = []; - this.childBalancer.destroy(); - } - getTypeName(): string { - return TYPE_NAME; - } -} - -function maybeServerConfigEqual(config1: XdsServerConfig | undefined, config2: XdsServerConfig | undefined) { - if (config1 !== undefined && config2 !== undefined) { - return serverConfigEqual(config1, config2); - } else { - return config1 === config2; - } -} - -export class XdsClusterResolverChildPolicyHandler extends ChildLoadBalancerHandler { - protected configUpdateRequiresNewPolicyInstance(oldConfig: TypedLoadBalancingConfig, newConfig: TypedLoadBalancingConfig): boolean { - if (!(oldConfig instanceof XdsClusterResolverLoadBalancingConfig && newConfig instanceof XdsClusterResolverLoadBalancingConfig)) { - return super.configUpdateRequiresNewPolicyInstance(oldConfig, newConfig); - } - if (oldConfig.getDiscoveryMechanisms().length !== newConfig.getDiscoveryMechanisms().length) { - return true; - } - for (let i = 0; i < oldConfig.getDiscoveryMechanisms().length; i++) { - const oldDiscoveryMechanism = oldConfig.getDiscoveryMechanisms()[i]; - const newDiscoveryMechanism = newConfig.getDiscoveryMechanisms()[i]; - if (oldDiscoveryMechanism.type !== newDiscoveryMechanism.type || - oldDiscoveryMechanism.cluster !== newDiscoveryMechanism.cluster || - oldDiscoveryMechanism.eds_service_name !== newDiscoveryMechanism.eds_service_name || - oldDiscoveryMechanism.dns_hostname !== newDiscoveryMechanism.dns_hostname || - !maybeServerConfigEqual(oldDiscoveryMechanism.lrs_load_reporting_server, newDiscoveryMechanism.lrs_load_reporting_server)) { - return true; - } - } - return false; - } -} - -export function setup() { - registerLoadBalancerType(TYPE_NAME, XdsClusterResolver, XdsClusterResolverLoadBalancingConfig); -} diff --git a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts index b93f37a8..4a90d5cd 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts @@ -21,7 +21,7 @@ import { ChannelCredentials, ChannelOptions, LoadBalancingConfig, experimental, import { loadProtosWithOptionsSync } from "@grpc/proto-loader/build/src/util"; import { WeightedTargetRaw } from "./load-balancer-weighted-target"; import { isLocalityEndpoint } from "./load-balancer-priority"; -import { localityToName } from "./load-balancer-xds-cluster-resolver"; +import { localityToName } from "./load-balancer-cds"; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 496e1efc..160d4f39 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -18,17 +18,13 @@ import * as protoLoader from '@grpc/proto-loader'; import { RE2 } from 're2-wasm'; -import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client'; -import { StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ServiceConfig, LoadBalancingConfig, RetryPolicy } from '@grpc/grpc-js'; +import { getSingletonXdsClient, XdsClient } from './xds-client'; +import { status, logVerbosity, Metadata, experimental, ChannelOptions, ServiceConfig, LoadBalancingConfig, RetryPolicy } from '@grpc/grpc-js'; import Resolver = experimental.Resolver; import GrpcUri = experimental.GrpcUri; import ResolverListener = experimental.ResolverListener; import uriToString = experimental.uriToString; import registerResolver = experimental.registerResolver; -import { Listener__Output } from './generated/envoy/config/listener/v3/Listener'; -import { RouteConfiguration__Output } from './generated/envoy/config/route/v3/RouteConfiguration'; -import { HttpConnectionManager__Output } from './generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager'; -import { VirtualHost__Output } from './generated/envoy/config/route/v3/VirtualHost'; import ConfigSelector = experimental.ConfigSelector; import { Matcher } from './matcher'; import { HashPolicy, RouteAction, SingleClusterRouteAction, WeightedCluster, WeightedClusterRouteAction } from './route-action'; @@ -40,12 +36,11 @@ import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_FEDERATION, EXPERIMENTAL_RET import Filter = experimental.Filter; import FilterFactory = experimental.FilterFactory; import { BootstrapInfo, loadBootstrapInfo, validateBootstrapConfig } from './xds-bootstrap'; -import { ListenerResourceType } from './xds-resource-type/listener-resource-type'; -import { RouteConfigurationResourceType } from './xds-resource-type/route-config-resource-type'; import { protoDurationToDuration } from './duration'; import { loadXxhashApi } from './xxhash'; import { formatTemplateString } from './xds-bootstrap'; import { getPredicateForMatcher } from './route'; +import { XdsConfig, XdsConfigWatcher, XdsDependencyManager } from './xds-dependency-manager'; const TRACER_NAME = 'xds_resolver'; @@ -53,86 +48,6 @@ function trace(text: string): void { experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); } -// Better match type has smaller value. -enum MatchType { - EXACT_MATCH, - SUFFIX_MATCH, - PREFIX_MATCH, - UNIVERSE_MATCH, - INVALID_MATCH, -}; - -function domainPatternMatchType(domainPattern: string): MatchType { - if (domainPattern.length === 0) { - return MatchType.INVALID_MATCH; - } - if (domainPattern.indexOf('*') < 0) { - return MatchType.EXACT_MATCH; - } - if (domainPattern === '*') { - return MatchType.UNIVERSE_MATCH; - } - if (domainPattern.startsWith('*')) { - return MatchType.SUFFIX_MATCH; - } - if (domainPattern.endsWith('*')) { - return MatchType.PREFIX_MATCH; - } - return MatchType.INVALID_MATCH; -} - -function domainMatch(matchType: MatchType, domainPattern: string, expectedHostName: string) { - switch (matchType) { - case MatchType.EXACT_MATCH: - return expectedHostName === domainPattern; - case MatchType.SUFFIX_MATCH: - return expectedHostName.endsWith(domainPattern.substring(1)); - case MatchType.PREFIX_MATCH: - return expectedHostName.startsWith(domainPattern.substring(0, domainPattern.length - 1)); - case MatchType.UNIVERSE_MATCH: - return true; - case MatchType.INVALID_MATCH: - return false; - } -} - -interface HasDomains { - domains: string[]; -} - -export function findVirtualHostForDomain(virutalHostList: T[], domain: string): T | null { - let targetVhost: T | null = null; - let bestMatchType: MatchType = MatchType.INVALID_MATCH; - let longestMatch = 0; - for (const virtualHost of virutalHostList) { - for (const domainPattern of virtualHost.domains) { - const matchType = domainPatternMatchType(domainPattern); - // If we already have a match of a better type, skip this one - if (matchType > bestMatchType) { - continue; - } - // If we already have a longer match of the same type, skip this one - if (matchType === bestMatchType && domainPattern.length <= longestMatch) { - continue; - } - if (domainMatch(matchType, domainPattern, domain)) { - targetVhost = virtualHost; - bestMatchType = matchType; - longestMatch = domainPattern.length; - } - if (bestMatchType === MatchType.EXACT_MATCH) { - break; - } - } - if (bestMatchType === MatchType.EXACT_MATCH) { - break; - } - } - return targetVhost; -} - -const numberRegex = new RE2(/^-?\d+$/u); - function protoDurationToSecondsString(duration: Duration__Output): string { return `${duration.seconds + duration.nanos / 1_000_000_000}s`; } @@ -166,30 +81,16 @@ const RETRY_CODES: {[key: string]: status} = { }; class XdsResolver implements Resolver { - private hasReportedSuccess = false; - private ldsWatcher: Watcher; - private rdsWatcher: Watcher - private isLdsWatcherActive = false; private listenerResourceName: string | null = null; - /** - * The latest route config name from an LDS response. The RDS watcher is - * actively watching that name if and only if this is not null. - */ - private latestRouteConfigName: string | null = null; - - private latestRouteConfig: RouteConfiguration__Output | null = null; - - private clusterRefcounts = new Map(); - - private latestDefaultTimeout: Duration | undefined = undefined; - - private ldsHttpFilterConfigs: {name: string, config: HttpFilterConfig}[] = []; private bootstrapInfo: BootstrapInfo | null = null; private xdsClient: XdsClient; + private xdsConfigWatcher: XdsConfigWatcher; + private xdsDependencyManager: XdsDependencyManager | null = null; + constructor( private target: GrpcUri, private listener: ResolverListener, @@ -202,116 +103,47 @@ class XdsResolver implements Resolver { } else { this.xdsClient = getSingletonXdsClient(); } - this.ldsWatcher = new Watcher({ - onResourceChanged: (update: Listener__Output) => { - const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, update.api_listener!.api_listener!.value); - const defaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout; - if (defaultTimeout === null || defaultTimeout === undefined) { - this.latestDefaultTimeout = undefined; - } else { - this.latestDefaultTimeout = protoDurationToDuration(defaultTimeout); - } - if (EXPERIMENTAL_FAULT_INJECTION) { - this.ldsHttpFilterConfigs = []; - for (const filter of httpConnectionManager.http_filters) { - // typed_config must be set here, or validation would have failed - const filterConfig = parseTopLevelFilterConfig(filter.typed_config!); - if (filterConfig) { - this.ldsHttpFilterConfigs.push({name: filter.name, config: filterConfig}); - } - } - } - switch (httpConnectionManager.route_specifier) { - case 'rds': { - const routeConfigName = httpConnectionManager.rds!.route_config_name; - if (this.latestRouteConfigName !== routeConfigName) { - if (this.latestRouteConfigName !== null) { - RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher); - } - RouteConfigurationResourceType.startWatch(this.xdsClient, routeConfigName, this.rdsWatcher); - this.latestRouteConfigName = routeConfigName; - } - break; - } - case 'route_config': - if (this.latestRouteConfigName) { - RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher); - this.latestRouteConfigName = null; - } - this.handleRouteConfig(httpConnectionManager.route_config!); - break; - default: - // This is prevented by the validation rules - } + this.xdsConfigWatcher = { + onUpdate: xdsConfig => { + this.handleXdsConfig(xdsConfig); }, - onError: (error: StatusObject) => { - /* A transient error only needs to bubble up as a failure if we have - * not already provided a ServiceConfig for the upper layer to use */ - if (!this.hasReportedSuccess) { - trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details); - this.reportResolutionError(error.details); - } + onError: (context, status) => { + trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error retrieving ' + context + ': ' + status.details); + this.reportResolutionError(`Error retrieving resource ${context}: ${status.details}`); }, - onResourceDoesNotExist: () => { - trace('Resolution error for target ' + uriToString(this.target) + ': LDS resource does not exist'); - if (this.latestRouteConfigName) { - RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher); - this.latestRouteConfigName = null; - } - this.reportResolutionError(`Listener ${this.target} does not exist`); - } - }); - this.rdsWatcher = new Watcher({ - onResourceChanged: (update: RouteConfiguration__Output) => { - this.handleRouteConfig(update); - }, - onError: (error: StatusObject) => { - /* A transient error only needs to bubble up as a failure if we have - * not already provided a ServiceConfig for the upper layer to use */ - if (!this.hasReportedSuccess) { - trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details); - this.reportResolutionError(error.details); - } - }, - onResourceDoesNotExist: () => { - trace('Resolution error for target ' + uriToString(this.target) + ' and route config ' + this.latestRouteConfigName + ': RDS resource does not exist'); - this.reportResolutionError(`Route config ${this.latestRouteConfigName} does not exist`); - } - }); - } - - private refCluster(clusterName: string) { - const refCount = this.clusterRefcounts.get(clusterName); - if (refCount) { - refCount.refCount += 1; - } - } - - private unrefCluster(clusterName: string) { - const refCount = this.clusterRefcounts.get(clusterName); - if (refCount) { - refCount.refCount -= 1; - if (!refCount.inLastConfig && refCount.refCount === 0) { - this.clusterRefcounts.delete(clusterName); - this.handleRouteConfig(this.latestRouteConfig!); + onResourceDoesNotExist: context => { + trace('Resolution error for target ' + uriToString(this.target) + ': ' + context + ' does not exist'); + /* Return an empty endpoint list and service config, to explicitly + * invalidate any previously returned service config */ + this.listener.onSuccessfulResolution([], null, null, null, {}); } } } - private async handleRouteConfig(routeConfig: RouteConfiguration__Output) { + private async handleXdsConfig(xdsConfig: XdsConfig) { /* We need to load the xxhash API before this function finishes, because * it is invoked in the config selector, which can be called immediately * after this function returns. */ await loadXxhashApi(); - this.latestRouteConfig = routeConfig; - /* Select the virtual host using the default authority override if it - * exists, and the channel target otherwise. */ - const hostDomain = this.channelOptions['grpc.default_authority'] ?? this.target.path; - const virtualHost = findVirtualHostForDomain(routeConfig.virtual_hosts, hostDomain); - if (virtualHost === null) { - this.reportResolutionError('No matching route found for ' + hostDomain); - return; + const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, xdsConfig.listener.api_listener!.api_listener!.value); + const configDefaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout; + let defaultTimeout: Duration | undefined = undefined; + if (configDefaultTimeout === null || configDefaultTimeout === undefined) { + defaultTimeout = undefined; + } else { + defaultTimeout = protoDurationToDuration(configDefaultTimeout); } + const ldsHttpFilterConfigs: {name: string, config: HttpFilterConfig}[] = []; + if (EXPERIMENTAL_FAULT_INJECTION) { + for (const filter of httpConnectionManager.http_filters) { + // typed_config must be set here, or validation would have failed + const filterConfig = parseTopLevelFilterConfig(filter.typed_config!); + if (filterConfig) { + ldsHttpFilterConfigs.push({name: filter.name, config: filterConfig}); + } + } + } + const virtualHost = xdsConfig.virtualHost; const virtualHostHttpFilterOverrides = new Map(); if (EXPERIMENTAL_FAULT_INJECTION) { for (const [name, filter] of Object.entries(virtualHost.typed_per_filter_config ?? {})) { @@ -335,7 +167,7 @@ class XdsResolver implements Resolver { } else if (route.route?.max_stream_duration?.max_stream_duration) { timeout = protoDurationToDuration(route.route.max_stream_duration.max_stream_duration); } else { - timeout = this.latestDefaultTimeout; + timeout = defaultTimeout; } // "A value of 0 indicates the application's deadline is used without modification." if (timeout?.seconds === 0 && timeout.nanos === 0) { @@ -405,7 +237,7 @@ class XdsResolver implements Resolver { allConfigClusters.add(cluster); const extraFilterFactories: FilterFactory[] = []; if (EXPERIMENTAL_FAULT_INJECTION) { - for (const filterConfig of this.ldsHttpFilterConfigs) { + for (const filterConfig of ldsHttpFilterConfigs) { if (routeHttpFilterOverrides.has(filterConfig.name)) { const filter = createHttpFilter(filterConfig.config, routeHttpFilterOverrides.get(filterConfig.name)!); if (filter) { @@ -440,7 +272,7 @@ class XdsResolver implements Resolver { clusterHttpFilterOverrides.set(name, parsedConfig); } } - for (const filterConfig of this.ldsHttpFilterConfigs) { + for (const filterConfig of ldsHttpFilterConfigs) { if (clusterHttpFilterOverrides.has(filterConfig.name)) { const filter = createHttpFilter(filterConfig.config, clusterHttpFilterOverrides.get(filterConfig.name)!); if (filter) { @@ -477,31 +309,13 @@ class XdsResolver implements Resolver { const routeMatcher = getPredicateForMatcher(route.match!); matchList.push({matcher: routeMatcher, action: routeAction}); } - /* Mark clusters that are not in this route config, and remove ones with - * no references */ - for (const [name, refCount] of Array.from(this.clusterRefcounts.entries())) { - if (!allConfigClusters.has(name)) { - refCount.inLastConfig = false; - if (refCount.refCount === 0) { - this.clusterRefcounts.delete(name); - } - } - } - // Add any new clusters from this route config - for (const name of allConfigClusters) { - if (this.clusterRefcounts.has(name)) { - this.clusterRefcounts.get(name)!.inLastConfig = true; - } else { - this.clusterRefcounts.set(name, {inLastConfig: true, refCount: 0}); - } - } const configSelector: ConfigSelector = (methodName, metadata, channelId) => { for (const {matcher, action} of matchList) { if (matcher.apply(methodName, metadata)) { const clusterResult = action.getCluster(); - this.refCluster(clusterResult.name); + const unrefCluster = this.xdsDependencyManager!.addClusterSubscription(clusterResult.name); const onCommitted = () => { - this.unrefCluster(clusterResult.name); + unrefCluster(); } let hash: string; if (EXPERIMENTAL_RING_HASH) { @@ -532,7 +346,7 @@ class XdsResolver implements Resolver { trace('=> ' + action.toString()); } const clusterConfigMap: {[key: string]: {child_policy: LoadBalancingConfig[]}} = {}; - for (const clusterName of this.clusterRefcounts.keys()) { + for (const clusterName of allConfigClusters) { clusterConfigMap[clusterName] = {child_policy: [{cds: {cluster: clusterName}}]}; } const lbPolicyConfig = {xds_cluster_manager: {children: clusterConfigMap}}; @@ -540,7 +354,10 @@ class XdsResolver implements Resolver { methodConfig: [], loadBalancingConfig: [lbPolicyConfig] } - this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {xdsClient: this.xdsClient}); + this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, { + xdsClient: this.xdsClient, + xdsConfig: xdsConfig + }); } private reportResolutionError(reason: string) { @@ -554,18 +371,19 @@ class XdsResolver implements Resolver { } private startResolution(): void { - if (!this.isLdsWatcherActive) { + if (!this.xdsDependencyManager) { trace('Starting resolution for target ' + uriToString(this.target)); try { - this.listenerResourceName = getListenerResourceName(this.bootstrapInfo!, this.target); + const listenerResourceName = getListenerResourceName(this.bootstrapInfo!, this.target); trace('Resolving target ' + uriToString(this.target) + ' with Listener resource name ' + this.listenerResourceName); - ListenerResourceType.startWatch(this.xdsClient, this.listenerResourceName, this.ldsWatcher); - this.isLdsWatcherActive = true; - + const hostDomain = this.channelOptions['grpc.default_authority'] ?? this.target.path; + this.xdsDependencyManager = new XdsDependencyManager(this.xdsClient, listenerResourceName, hostDomain, this.xdsConfigWatcher); } catch (e) { this.reportResolutionError((e as Error).message); + return; } } + this.xdsDependencyManager.updateResolution(); } updateResolution(): void { @@ -581,23 +399,19 @@ class XdsResolver implements Resolver { this.startResolution(); } } else { - if (!this.isLdsWatcherActive) { + if (!this.xdsDependencyManager) { trace('Starting resolution for target ' + uriToString(this.target)); - ListenerResourceType.startWatch(this.xdsClient, this.target.path, this.ldsWatcher); - this.listenerResourceName = this.target.path; - this.isLdsWatcherActive = true; + const hostDomain = this.channelOptions['grpc.default_authority'] ?? this.target.path; + this.xdsDependencyManager = new XdsDependencyManager(this.xdsClient, this.target.path, hostDomain, this.xdsConfigWatcher); } + this.xdsDependencyManager.updateResolution(); } } destroy() { - if (this.listenerResourceName) { - ListenerResourceType.cancelWatch(this.xdsClient, this.listenerResourceName, this.ldsWatcher); - this.isLdsWatcherActive = false; - } - if (this.latestRouteConfigName) { - RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher); - this.latestRouteConfigName = null; + if (this.xdsDependencyManager) { + this.xdsDependencyManager.destroy(); + this.xdsDependencyManager = null; } } diff --git a/packages/grpc-js-xds/src/server.ts b/packages/grpc-js-xds/src/server.ts index d3c42eee..29cd91f0 100644 --- a/packages/grpc-js-xds/src/server.ts +++ b/packages/grpc-js-xds/src/server.ts @@ -33,7 +33,7 @@ import { DOWNSTREAM_TLS_CONTEXT_TYPE_URL, HTTP_CONNECTION_MANGER_TYPE_URL, decod import { FilterChain__Output } from "./generated/envoy/config/listener/v3/FilterChain"; import { getPredicateForMatcher } from "./route"; import { crossProduct } from "./cross-product"; -import { findVirtualHostForDomain } from "./resolver-xds"; +import { findVirtualHostForDomain } from "./xds-dependency-manager"; import { LogVerbosity } from "@grpc/grpc-js/build/src/constants"; import { XdsServerCredentials } from "./xds-credentials"; import { CertificateValidationContext__Output } from "./generated/envoy/extensions/transport_sockets/tls/v3/CertificateValidationContext"; diff --git a/packages/grpc-js-xds/src/xds-dependency-manager.ts b/packages/grpc-js-xds/src/xds-dependency-manager.ts new file mode 100644 index 00000000..e455929f --- /dev/null +++ b/packages/grpc-js-xds/src/xds-dependency-manager.ts @@ -0,0 +1,763 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { experimental, logVerbosity, Metadata, status, StatusObject } from "@grpc/grpc-js"; +import { Listener__Output } from "./generated/envoy/config/listener/v3/Listener"; +import { RouteConfiguration__Output } from "./generated/envoy/config/route/v3/RouteConfiguration"; +import { VirtualHost__Output } from "./generated/envoy/config/route/v3/VirtualHost"; +import { CdsUpdate, ClusterResourceType } from "./xds-resource-type/cluster-resource-type"; +import { Watcher, XdsClient } from "./xds-client"; +import { Locality__Output } from "./generated/envoy/config/core/v3/Locality"; +import { DropCategory } from "./load-balancer-xds-cluster-impl"; +import Endpoint = experimental.Endpoint; +import Resolver = experimental.Resolver; +import createResolver = experimental.createResolver; +import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from "./resources"; +import { RouteConfigurationResourceType } from "./xds-resource-type/route-config-resource-type"; +import { ListenerResourceType } from "./xds-resource-type/listener-resource-type"; +import { ClusterLoadAssignment__Output } from "./generated/envoy/config/endpoint/v3/ClusterLoadAssignment"; +import { EndpointResourceType } from "./xds-resource-type/endpoint-resource-type"; +import { SocketAddress__Output } from "./generated/envoy/config/core/v3/SocketAddress"; +import { EXPERIMENTAL_DUALSTACK_ENDPOINTS } from "./environment"; + +const TRACER_NAME = 'xds_resolver'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +interface WeightedEndpoint { + endpoint: Endpoint; + weight: number; +} + +interface LocalityEntry { + locality: Locality__Output; + weight: number; + endpoints: WeightedEndpoint[]; +} + +interface PriorityEntry { + localities: LocalityEntry[]; +} + +interface EndpointResource { + priorities: PriorityEntry[]; + dropCategories: DropCategory[]; +} + +export interface EndpointConfig { + type: 'endpoint'; + endpoints?: EndpointResource; + resolutionNote?: string; +} + +export interface AggregateConfig { + type: 'aggregate'; + leafClusters: string[]; +} + +export interface ClusterConfig { + cluster: CdsUpdate; + children: EndpointConfig | AggregateConfig; +} + +export type StatusOr = { + success: true; + value: T +} | { + success: false; + error: StatusObject; +} + +export interface ClusterResult { + clusterConfig?: ClusterConfig; + status?: StatusObject; +} + +export interface XdsConfig { + listener: Listener__Output; + routeConfig: RouteConfiguration__Output; + virtualHost: VirtualHost__Output; + clusters: Map>; +} + +export interface XdsConfigWatcher { + onUpdate(xdsConfig: XdsConfig): void; + onError(context: string, status: StatusObject): void; + onResourceDoesNotExist(context: string): void; +} + +interface AggregateClusterInfo { + type: 'AGGREGATE'; + cdsUpdate: CdsUpdate; +} + +interface EdsClusterInfo { + type: 'EDS'; + cdsUpdate: CdsUpdate; + edsServiceName: string; + watcher: Watcher; + latestUpdate?: EndpointResource; + resolutionNote?: string; +} + +interface LogicalDnsClusterInfo { + type: 'LOGICAL_DNS'; + cdsUpdate: CdsUpdate; + dnsHostname: string; + resolver: Resolver; + latestUpdate?: EndpointResource; + resolutionNote?: string; +} + +type ClusterInfo = AggregateClusterInfo | EdsClusterInfo | LogicalDnsClusterInfo; + +interface ClusterEntry { + watcher: Watcher; + latestUpdate?: StatusOr; + children: string[]; +} + +interface ClusterGraph { + [name: string]: ClusterEntry; +} + +function isClusterTreeFullyUpdated(tree: ClusterGraph, roots: string[]): boolean { + const toCheck: string[] = [...roots]; + const visited = new Set(); + while (toCheck.length > 0) { + const next = toCheck.shift()!; + if (visited.has(next)) { + continue; + } + visited.add(next); + if (!tree[next] || !tree[next].latestUpdate) { + return false; + } + if (tree[next].latestUpdate.success) { + if (tree[next].latestUpdate.value.type !== 'AGGREGATE') { + if (!(tree[next].latestUpdate.value.latestUpdate || tree[next].latestUpdate.value.latestUpdate)) { + return false; + } + } + } + toCheck.push(...tree[next].children); + } + return true; +} + +// Better match type has smaller value. +enum MatchType { + EXACT_MATCH, + SUFFIX_MATCH, + PREFIX_MATCH, + UNIVERSE_MATCH, + INVALID_MATCH, +}; + +function domainPatternMatchType(domainPattern: string): MatchType { + if (domainPattern.length === 0) { + return MatchType.INVALID_MATCH; + } + if (domainPattern.indexOf('*') < 0) { + return MatchType.EXACT_MATCH; + } + if (domainPattern === '*') { + return MatchType.UNIVERSE_MATCH; + } + if (domainPattern.startsWith('*')) { + return MatchType.SUFFIX_MATCH; + } + if (domainPattern.endsWith('*')) { + return MatchType.PREFIX_MATCH; + } + return MatchType.INVALID_MATCH; +} + +function domainMatch(matchType: MatchType, domainPattern: string, expectedHostName: string) { + switch (matchType) { + case MatchType.EXACT_MATCH: + return expectedHostName === domainPattern; + case MatchType.SUFFIX_MATCH: + return expectedHostName.endsWith(domainPattern.substring(1)); + case MatchType.PREFIX_MATCH: + return expectedHostName.startsWith(domainPattern.substring(0, domainPattern.length - 1)); + case MatchType.UNIVERSE_MATCH: + return true; + case MatchType.INVALID_MATCH: + return false; + } +} + +interface HasDomains { + domains: string[]; +} + +export function findVirtualHostForDomain(virutalHostList: T[], domain: string): T | null { + let targetVhost: T | null = null; + let bestMatchType: MatchType = MatchType.INVALID_MATCH; + let longestMatch = 0; + for (const virtualHost of virutalHostList) { + for (const domainPattern of virtualHost.domains) { + const matchType = domainPatternMatchType(domainPattern); + // If we already have a match of a better type, skip this one + if (matchType > bestMatchType) { + continue; + } + // If we already have a longer match of the same type, skip this one + if (matchType === bestMatchType && domainPattern.length <= longestMatch) { + continue; + } + if (domainMatch(matchType, domainPattern, domain)) { + targetVhost = virtualHost; + bestMatchType = matchType; + longestMatch = domainPattern.length; + } + if (bestMatchType === MatchType.EXACT_MATCH) { + break; + } + } + if (bestMatchType === MatchType.EXACT_MATCH) { + break; + } + } + return targetVhost; +} + +function getEdsResource(edsUpdate: ClusterLoadAssignment__Output): EndpointResource { + const result: PriorityEntry[] = []; + const dropCategories: DropCategory[] = []; + if (edsUpdate.policy) { + for (const dropOverload of edsUpdate.policy.drop_overloads) { + if (!dropOverload.drop_percentage) { + continue; + } + let requestsPerMillion: number; + switch (dropOverload.drop_percentage.denominator) { + case 'HUNDRED': + requestsPerMillion = dropOverload.drop_percentage.numerator * 10_000; + break; + case 'TEN_THOUSAND': + requestsPerMillion = dropOverload.drop_percentage.numerator * 100; + break; + case 'MILLION': + requestsPerMillion = dropOverload.drop_percentage.numerator; + break; + } + dropCategories.push({ + category: dropOverload.category, + requests_per_million: requestsPerMillion + }); + } + } + for (const endpoint of edsUpdate.endpoints) { + if (!endpoint.load_balancing_weight) { + continue; + } + const endpoints: WeightedEndpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map( + (lbEndpoint) => { + /* The validator in the XdsClient class ensures that each endpoint has + * a socket_address with an IP address and a port_value. */ + let socketAddresses: SocketAddress__Output[]; + if (EXPERIMENTAL_DUALSTACK_ENDPOINTS) { + socketAddresses = [ + lbEndpoint.endpoint!.address!.socket_address!, + ...lbEndpoint.endpoint!.additional_addresses.map(additionalAddress => additionalAddress.address!.socket_address!) + ]; + } else { + socketAddresses = [lbEndpoint.endpoint!.address!.socket_address!]; + } + return { + endpoint: { + addresses: socketAddresses.map(socketAddress => ({ + host: socketAddress.address!, + port: socketAddress.port_value! + })) + }, + weight: lbEndpoint.load_balancing_weight?.value ?? 1 + }; + } + ); + if (endpoints.length === 0) { + continue; + } + let priorityEntry: PriorityEntry; + if (result[endpoint.priority]) { + priorityEntry = result[endpoint.priority]; + } else { + priorityEntry = { + localities: [] + }; + result[endpoint.priority] = priorityEntry; + } + priorityEntry.localities.push({ + locality: endpoint.locality!, + endpoints: endpoints, + weight: endpoint.load_balancing_weight.value + }); + } + // Collapse spaces in sparse array + return { + priorities: result.filter(priority => priority), + dropCategories: dropCategories + }; +} + +function getDnsResource(endpoints: Endpoint[]): EndpointResource { + return { + priorities: [{ + localities: [{ + locality: { + region: '', + zone: '', + sub_zone: '' + }, + weight: 1, + endpoints: endpoints.map(endpoint => ({endpoint: endpoint, weight: 1})) + }] + }], + dropCategories: [] + } +} + +export class XdsDependencyManager { + private ldsWatcher: Watcher; + private rdsWatcher: Watcher; + private latestListener: Listener__Output | null = null; + private latestRouteConfigName: string | null = null; + private latestRouteConfiguration: RouteConfiguration__Output | null = null; + private clusterRoots: string[] = []; + private subscribedClusters: {[cluster: string]: number} = {}; + private clusterForest: ClusterGraph = {}; + constructor(private xdsClient: XdsClient, private listenerResourceName: string, private dataPlaneAuthority: string, private watcher: XdsConfigWatcher) { + this.ldsWatcher = new Watcher({ + onResourceChanged: (update: Listener__Output) => { + this.latestListener = update; + const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, update.api_listener!.api_listener!.value); + switch (httpConnectionManager.route_specifier) { + case 'rds': { + const routeConfigName = httpConnectionManager.rds!.route_config_name; + if (this.latestRouteConfigName !== routeConfigName) { + if (this.latestRouteConfigName !== null) { + RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher); + this.latestRouteConfiguration = null; + this.clusterRoots = []; + this.pruneOrphanClusters(); + } + RouteConfigurationResourceType.startWatch(this.xdsClient, routeConfigName, this.rdsWatcher); + this.latestRouteConfigName = routeConfigName; + } + break; + } + case 'route_config': + if (this.latestRouteConfigName) { + RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher); + this.latestRouteConfigName = null; + } + this.handleRouteConfig(httpConnectionManager.route_config!); + break; + default: + // This is prevented by the validation rules + } + }, + onError: (error: StatusObject) => { + /* A transient error only needs to bubble up as a failure if we have + * not already provided a ServiceConfig for the upper layer to use */ + if (!this.latestListener) { + trace('Resolution error for target ' + listenerResourceName + ' due to xDS client transient error ' + error.details); + this.watcher.onError(`Listener ${listenerResourceName}`, error); + } + }, + onResourceDoesNotExist: () => { + trace('Resolution error for target ' + listenerResourceName + ': LDS resource does not exist'); + if (this.latestRouteConfigName) { + RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher); + this.latestRouteConfigName = null; + this.latestRouteConfiguration = null; + this.clusterRoots = []; + this.pruneOrphanClusters(); + } + this.watcher.onResourceDoesNotExist(`Listener ${listenerResourceName}`); + } + }); + this.rdsWatcher = new Watcher({ + onResourceChanged: (update: RouteConfiguration__Output) => { + this.handleRouteConfig(update); + }, + onError: (error: StatusObject) => { + if (!this.latestRouteConfiguration) { + this.watcher.onError(`RouteConfiguration ${this.latestRouteConfigName}`, error); + } + }, + onResourceDoesNotExist: () => { + this.watcher.onResourceDoesNotExist(`RouteConfiguration ${this.latestRouteConfigName}`); + this.clusterRoots = []; + this.pruneOrphanClusters(); + } + }); + ListenerResourceType.startWatch(this.xdsClient, listenerResourceName, this.ldsWatcher); + } + + private maybeSendUpdate() { + if (!(this.latestListener && this.latestRouteConfiguration && isClusterTreeFullyUpdated(this.clusterForest, this.clusterRoots))) { + return; + } + const update: XdsConfig = { + listener: this.latestListener, + routeConfig: this.latestRouteConfiguration, + virtualHost: findVirtualHostForDomain(this.latestRouteConfiguration.virtual_hosts, this.dataPlaneAuthority)!, + clusters: new Map() + }; + for (const [clusterName, entry] of Object.entries(this.clusterForest)) { + if (!entry.latestUpdate) { + return; + } + if (entry.latestUpdate.success) { + let clusterChildren: EndpointConfig | AggregateConfig; + if (entry.latestUpdate.value.type === 'AGGREGATE') { + clusterChildren = { + type: 'aggregate', + leafClusters: entry.children + }; + } else { + clusterChildren = { + type: 'endpoint', + endpoints: entry.latestUpdate.value.latestUpdate ? entry.latestUpdate.value.latestUpdate : undefined, + resolutionNote: entry.latestUpdate.value.resolutionNote + }; + } + update.clusters.set(clusterName, { + success: true, + value: { + cluster: entry.latestUpdate.value.cdsUpdate, + children: clusterChildren + } + }); + } else { + update.clusters.set(clusterName, { + success: false, + error: entry.latestUpdate.error + }); + } + } + this.watcher.onUpdate(update); + } + + private addCluster(clusterName: string) { + if (clusterName in this.clusterForest) { + return; + } + const entry: ClusterEntry = { + watcher: new Watcher({ + onResourceChanged: (update: CdsUpdate) => { + switch (update.type) { + case 'AGGREGATE': + if (entry.latestUpdate?.success) { + switch (entry.latestUpdate.value.type) { + case 'AGGREGATE': + break; + case 'EDS': + EndpointResourceType.cancelWatch(this.xdsClient, entry.latestUpdate.value.edsServiceName, entry.latestUpdate.value.watcher); + break; + case 'LOGICAL_DNS': + entry.latestUpdate.value.resolver.destroy(); + break; + } + } + entry.children = update.aggregateChildren; + entry.latestUpdate = { + success: true, + value: { + type: 'AGGREGATE', + cdsUpdate: update + } + } + for (const child of update.aggregateChildren) { + this.addCluster(child); + } + this.pruneOrphanClusters(); + this.maybeSendUpdate(); + break; + case 'EDS': + const edsServiceName = update.edsServiceName ?? clusterName; + if (entry.latestUpdate?.success) { + switch (entry.latestUpdate.value.type) { + case 'AGGREGATE': + entry.children = []; + this.pruneOrphanClusters(); + break; + case 'EDS': + // If the names are the same, keep the watch + if (entry.latestUpdate.value.edsServiceName !== edsServiceName) { + EndpointResourceType.cancelWatch(this.xdsClient, entry.latestUpdate.value.edsServiceName, entry.latestUpdate.value.watcher); + EndpointResourceType.startWatch(this.xdsClient, edsServiceName, entry.latestUpdate.value.watcher); + entry.latestUpdate.value.edsServiceName = edsServiceName; + entry.latestUpdate.value.latestUpdate = undefined; + entry.latestUpdate.value.resolutionNote = undefined; + } + entry.latestUpdate.value.cdsUpdate = update; + this.maybeSendUpdate(); + return; + case 'LOGICAL_DNS': + entry.latestUpdate.value.resolver.destroy(); + break; + } + } + const edsWatcher = new Watcher({ + onResourceChanged: (endpoint: ClusterLoadAssignment__Output) => { + if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') { + entry.latestUpdate.value.latestUpdate = getEdsResource(endpoint); + entry.latestUpdate.value.resolutionNote = undefined; + this.maybeSendUpdate(); + } + }, + onError: error => { + if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') { + if (!entry.latestUpdate.value.latestUpdate) { + entry.latestUpdate.value.resolutionNote = `Control plane error: ${error.details}`; + this.maybeSendUpdate(); + } + } + }, + onResourceDoesNotExist: () => { + if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') { + entry.latestUpdate.value.resolutionNote = 'Resource does not exist'; + entry.latestUpdate.value.latestUpdate = undefined; + this.maybeSendUpdate(); + } + } + }); + entry.latestUpdate = { + success: true, + value: { + type: 'EDS', + cdsUpdate: update, + edsServiceName: edsServiceName, + watcher: edsWatcher + } + }; + EndpointResourceType.startWatch(this.xdsClient, edsServiceName, edsWatcher); + this.maybeSendUpdate(); + break; + case 'LOGICAL_DNS': { + if (entry.latestUpdate?.success) { + switch (entry.latestUpdate.value.type) { + case 'AGGREGATE': + entry.children = []; + this.pruneOrphanClusters(); + break; + case 'EDS': + EndpointResourceType.cancelWatch(this.xdsClient, entry.latestUpdate.value.edsServiceName, entry.latestUpdate.value.watcher); + break; + case 'LOGICAL_DNS': + if (entry.latestUpdate.value.dnsHostname === update.dnsHostname) { + entry.latestUpdate.value.cdsUpdate = update; + this.maybeSendUpdate(); + return; + } + } + } + trace('Creating DNS resolver'); + const resolver = createResolver({scheme: 'dns', path: update.dnsHostname!}, { + onSuccessfulResolution: endpointList => { + if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'LOGICAL_DNS') { + entry.latestUpdate.value.latestUpdate = getDnsResource(endpointList); + this.maybeSendUpdate(); + } + }, + onError: error => { + if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'LOGICAL_DNS') { + if (!entry.latestUpdate.value.latestUpdate) { + entry.latestUpdate.value.resolutionNote = `DNS resolution error: ${error.details}`; + this.maybeSendUpdate(); + } + } + } + }, {'grpc.service_config_disable_resolution': 1}); + entry.latestUpdate = { + success: true, + value: { + type: 'LOGICAL_DNS', + cdsUpdate: update, + dnsHostname: update.dnsHostname!, + resolver: resolver + } + } + resolver.updateResolution(); + this.maybeSendUpdate(); + break; + } + } + }, + onError: error => { + if (!entry.latestUpdate?.success) { + entry.latestUpdate = { + success: false, + error: error + }; + this.maybeSendUpdate(); + } + }, + onResourceDoesNotExist: () => { + if (entry.latestUpdate?.success) { + switch (entry.latestUpdate.value.type) { + case 'EDS': + EndpointResourceType.cancelWatch(this.xdsClient, entry.latestUpdate.value.edsServiceName, entry.latestUpdate.value.watcher); + break; + case 'LOGICAL_DNS': + entry.latestUpdate.value.resolver.destroy(); + break; + default: + break; + } + } + entry.latestUpdate = { + success: false, + error: { + code: status.UNAVAILABLE, + details: `Cluster resource ${clusterName} does not exist`, + metadata: new Metadata() + } + }; + this.maybeSendUpdate(); + } + }), + children: [] + } + this.clusterForest[clusterName] = entry; + ClusterResourceType.startWatch(this.xdsClient, clusterName, entry.watcher); + } + + addClusterSubscription(clusterName: string) { + this.subscribedClusters[clusterName] = (this.subscribedClusters[clusterName] ?? 0) + 1; + this.addCluster(clusterName); + let removeFunctionCalled = false; + return () => { + if (!removeFunctionCalled) { + removeFunctionCalled = true; + if (clusterName in this.subscribedClusters) { + this.subscribedClusters[clusterName] -= 1; + if (this.subscribedClusters[clusterName] <= 0) { + delete this.subscribedClusters[clusterName]; + this.pruneOrphanClusters(); + this.maybeSendUpdate(); + } + } + } + }; + } + + private removeCluster(clusterName: string) { + if (!(clusterName in this.clusterForest)) { + return; + } + const entry = this.clusterForest[clusterName]; + if (entry.latestUpdate?.success) { + switch (entry.latestUpdate.value.type) { + case 'EDS': + EndpointResourceType.cancelWatch(this.xdsClient, entry.latestUpdate.value.edsServiceName, entry.latestUpdate.value.watcher); + break; + case 'LOGICAL_DNS': + entry.latestUpdate.value.resolver.destroy(); + break; + default: + break; + } + } + ClusterResourceType.cancelWatch(this.xdsClient, clusterName, entry.watcher); + delete this.clusterForest[clusterName]; + } + + private pruneOrphanClusters() { + const toCheck = [...this.clusterRoots, ...Object.keys(this.subscribedClusters)]; + const visited = new Set(); + while(toCheck.length > 0) { + const next = toCheck.shift()!; + if (visited.has(next)) { + continue; + } + if (next in this.clusterForest) { + toCheck.push(...this.clusterForest[next].children); + } + visited.add(next); + } + for (const clusterName of Object.keys(this.clusterForest)) { + if (!visited.has(clusterName)) { + this.removeCluster(clusterName); + } + } + } + + private handleRouteConfig(routeConfig: RouteConfiguration__Output) { + this.latestRouteConfiguration = routeConfig; + const virtualHost = findVirtualHostForDomain(routeConfig.virtual_hosts, this.dataPlaneAuthority); + if (!virtualHost) { + this.clusterRoots = []; + this.pruneOrphanClusters(); + this.watcher.onError(`RouteConfiguration ${routeConfig.name}`, { + code: status.UNAVAILABLE, + details: `No matching route found for ${this.dataPlaneAuthority}`, + metadata: new Metadata() + }); + // Report error + return; + } + const allConfigClusters = new Set(); + for (const route of virtualHost.routes) { + switch(route.route!.cluster_specifier) { + case 'cluster_header': + break; + case 'cluster': + allConfigClusters.add(route.route!.cluster!); + break; + case 'weighted_clusters': + for (const clusterWeight of route.route!.weighted_clusters!.clusters) { + allConfigClusters.add(clusterWeight.name); + } + break; + default: + /* The validation logic should prevent us from reaching this point. + * This is just for the type checker. */ + break; + } + } + this.clusterRoots = [...allConfigClusters]; + this.pruneOrphanClusters(); + for (const clusterName of this.clusterRoots) { + this.addCluster(clusterName); + } + this.maybeSendUpdate(); + } + + updateResolution() { + for (const clusterEntry of Object.values(this.clusterForest)) { + if (clusterEntry.latestUpdate?.success && clusterEntry.latestUpdate.value.type === 'LOGICAL_DNS') { + clusterEntry.latestUpdate.value.resolver.updateResolution(); + } + } + } + + destroy() { + ListenerResourceType.cancelWatch(this.xdsClient, this.listenerResourceName, this.ldsWatcher); + if (this.latestRouteConfigName) { + RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher); + } + this.clusterRoots = []; + this.subscribedClusters = {}; + this.pruneOrphanClusters(); + } +} diff --git a/packages/grpc-js-xds/test/test-confg-parsing.ts b/packages/grpc-js-xds/test/test-confg-parsing.ts index c185c852..c52b0f47 100644 --- a/packages/grpc-js-xds/test/test-confg-parsing.ts +++ b/packages/grpc-js-xds/test/test-confg-parsing.ts @@ -66,186 +66,21 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = { error: /string.*cluster/ } ], - xds_cluster_resolver: [ - { - name: 'empty fields', - input: { - discovery_mechanisms: [], - xds_lb_policy: [] - } - }, - { - name: 'missing discovery_mechanisms', - input: { - xds_lb_policy: [] - }, - error: /discovery_mechanisms/ - }, - { - name: 'missing xds_lb_policy', - input: { - discovery_mechanisms: [] - }, - error: /xds_lb_policy/ - }, - { - name: 'discovery_mechanism: EDS', - input: { - discovery_mechanisms: [{ - cluster: 'abc', - type: 'EDS' - }], - xds_lb_policy: [] - }, - output: { - discovery_mechanisms: [{ - cluster: 'abc', - type: 'EDS', - lrs_load_reporting_server: undefined - }], - xds_lb_policy: [] - } - }, - { - name: 'discovery_mechanism: LOGICAL_DNS', - input: { - discovery_mechanisms: [{ - cluster: 'abc', - type: 'LOGICAL_DNS' - }], - xds_lb_policy: [] - }, - output: { - discovery_mechanisms: [{ - cluster: 'abc', - type: 'LOGICAL_DNS', - lrs_load_reporting_server: undefined - }], - xds_lb_policy: [] - } - }, - { - name: 'discovery_mechanism: undefined optional fields', - input: { - discovery_mechanisms: [{ - cluster: 'abc', - type: 'EDS', - max_concurrent_requests: undefined, - eds_service_name: undefined, - dns_hostname: undefined, - lrs_load_reporting_server: undefined - }], - xds_lb_policy: [] - } - }, - { - name: 'discovery_mechanism: populated optional fields', - input: { - discovery_mechanisms: [{ - cluster: 'abc', - type: 'EDS', - max_concurrent_requests: 100, - eds_service_name: 'def', - dns_hostname: 'localhost', - lrs_load_reporting_server: { - server_uri: 'localhost:12345', - channel_creds: [{ - type: 'google_default', - config: {} - }], - server_features: ['test'] - } - }], - xds_lb_policy: [] - } - } - ], xds_cluster_impl: [ { - name: 'only required fields', + name: 'required fields', input: { cluster: 'abc', - eds_service_name: 'def', - drop_categories: [], - lrs_load_reporting_server: { - server_uri: 'localhost:12345', - channel_creds: [{ - type: 'google_default', - config: {} - }], - server_features: ['test'] - }, + child_policy: [{round_robin: {}}] + } + }, + { + name: 'non-string cluster', + input: { + cluster: 123, child_policy: [{round_robin: {}}] }, - output: { - cluster: 'abc', - eds_service_name: 'def', - drop_categories: [], - lrs_load_reporting_server: { - server_uri: 'localhost:12345', - channel_creds: [{ - type: 'google_default', - config: {} - }], - server_features: ['test'] - }, - child_policy: [{round_robin: {}}], - max_concurrent_requests: 1024 - } - }, - { - name: 'undefined optional fields', - input: { - cluster: 'abc', - eds_service_name: 'def', - drop_categories: [], - lrs_load_reporting_server: { - server_uri: 'localhost:12345', - channel_creds: [{ - type: 'google_default', - config: {} - }], - server_features: ['test'] - }, - child_policy: [{round_robin: {}}], - max_concurrent_requests: undefined - }, - output: { - cluster: 'abc', - eds_service_name: 'def', - drop_categories: [], - lrs_load_reporting_server: { - server_uri: 'localhost:12345', - channel_creds: [{ - type: 'google_default', - config: {} - }], - server_features: ['test'] - }, - child_policy: [{round_robin: {}}], - max_concurrent_requests: 1024 - } - }, - { - name: 'populated optional fields', - input: { - cluster: 'abc', - eds_service_name: 'def', - drop_categories: [{ - category: 'test', - requests_per_million: 100 - }], - lrs_load_reporting_server: { - server_uri: 'localhost:12345', - channel_creds: [{ - type: 'google_default', - config: {} - }], - server_features: ['test'] - }, - child_policy: [{round_robin: {}}], - max_concurrent_requests: 123 - }, + error: /string.*cluster/ } ], priority: [ diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 03bda8c2..1658e43f 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -770,6 +770,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) { return; } + trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)) for (const endpoint of endpointList) { if (!this.entryMap.has(endpoint)) { trace('Adding map entry for ' + endpointToString(endpoint)); diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 7ef31548..475d796e 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -139,6 +139,9 @@ export function shuffled(list: T[]): T[] { function interleaveAddressFamilies( addressList: SubchannelAddress[] ): SubchannelAddress[] { + if (addressList.length === 0) { + return []; + } const result: SubchannelAddress[] = []; const ipv6Addresses: SubchannelAddress[] = []; const ipv4Addresses: SubchannelAddress[] = []; @@ -507,7 +510,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { ); trace('updateAddressList([' + rawAddressList.map(address => subchannelAddressToString(address)) + '])'); if (rawAddressList.length === 0) { - throw new Error('No addresses in endpoint list passed to pick_first'); + this.lastError = 'No addresses resolved'; } const addressList = interleaveAddressFamilies(rawAddressList); this.latestAddressList = addressList;