From 49b7c6af34cb7454ed0d4a85c3654db97a3cb6c4 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 30 Aug 2023 14:46:08 -0700 Subject: [PATCH 1/4] grpc-js: Make pick_first the universal leaf policy, switch to endpoint lists --- packages/grpc-js/src/experimental.ts | 6 +- .../src/load-balancer-child-handler.ts | 8 +- .../src/load-balancer-outlier-detection.ts | 287 +++++++++++------- .../grpc-js/src/load-balancer-pick-first.ts | 148 +++++++-- .../grpc-js/src/load-balancer-round-robin.ts | 145 ++++----- packages/grpc-js/src/load-balancer.ts | 20 +- packages/grpc-js/src/picker.ts | 17 +- packages/grpc-js/src/resolver-dns.ts | 51 +--- packages/grpc-js/src/resolver-ip.ts | 10 +- packages/grpc-js/src/resolver-uds.ts | 8 +- packages/grpc-js/src/resolver.ts | 4 +- .../grpc-js/src/resolving-load-balancer.ts | 8 +- packages/grpc-js/src/server.ts | 5 +- packages/grpc-js/src/subchannel-address.ts | 36 +++ packages/grpc-js/src/subchannel-interface.ts | 42 ++- packages/grpc-js/src/subchannel.ts | 12 + packages/grpc-js/test/common.ts | 10 +- packages/grpc-js/test/test-pick-first.ts | 115 ++++--- packages/grpc-js/test/test-resolver.ts | 240 +++++---------- 19 files changed, 668 insertions(+), 504 deletions(-) diff --git a/packages/grpc-js/src/experimental.ts b/packages/grpc-js/src/experimental.ts index 42fd577c..0c7bc75e 100644 --- a/packages/grpc-js/src/experimental.ts +++ b/packages/grpc-js/src/experimental.ts @@ -17,11 +17,15 @@ export { registerLoadBalancerType, selectLbConfigFromList, parseLoadBalancingConfig, - isLoadBalancerNameRegistered + isLoadBalancerNameRegistered, } from './load-balancer'; +export { LeafLoadBalancer } from './load-balancer-pick-first'; export { SubchannelAddress, subchannelAddressToString, + Endpoint, + endpointToString, + endpointHasAddress, } from './subchannel-address'; export { ChildLoadBalancerHandler } from './load-balancer-child-handler'; export { diff --git a/packages/grpc-js/src/load-balancer-child-handler.ts b/packages/grpc-js/src/load-balancer-child-handler.ts index b23f1926..11bfac21 100644 --- a/packages/grpc-js/src/load-balancer-child-handler.ts +++ b/packages/grpc-js/src/load-balancer-child-handler.ts @@ -21,7 +21,7 @@ import { TypedLoadBalancingConfig, createLoadBalancer, } from './load-balancer'; -import { SubchannelAddress } from './subchannel-address'; +import { Endpoint, SubchannelAddress } from './subchannel-address'; import { ChannelOptions } from './channel-options'; import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; @@ -95,12 +95,12 @@ export class ChildLoadBalancerHandler implements LoadBalancer { /** * Prerequisites: lbConfig !== null and lbConfig.name is registered - * @param addressList + * @param endpointList * @param lbConfig * @param attributes */ updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown } ): void { @@ -131,7 +131,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer { } } this.latestConfig = lbConfig; - childToUpdate.updateAddressList(addressList, lbConfig, attributes); + childToUpdate.updateAddressList(endpointList, lbConfig, attributes); } exitIdle(): void { if (this.currentChild) { diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 63a1d8f1..b0c648ef 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -32,12 +32,14 @@ import { import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; import { PickArgs, Picker, PickResult, PickResultType } from './picker'; import { + Endpoint, SubchannelAddress, - subchannelAddressToString, + endpointHasAddress, + endpointToString, + subchannelAddressEqual, } from './subchannel-address'; import { BaseSubchannelWrapper, - ConnectivityStateListener, SubchannelInterface, } from './subchannel-interface'; import * as logging from './logging'; @@ -107,7 +109,11 @@ function validateFieldType( expectedType: TypeofValues, objectName?: string ) { - if (fieldName in obj && obj[fieldName] !== undefined && typeof obj[fieldName] !== expectedType) { + if ( + fieldName in obj && + obj[fieldName] !== undefined && + typeof obj[fieldName] !== expectedType + ) { const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName; throw new Error( `outlier detection config ${fullFieldName} parse error: expected ${expectedType}, got ${typeof obj[ @@ -149,7 +155,11 @@ function validatePositiveDuration( function validatePercentage(obj: any, fieldName: string, objectName?: string) { const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName; validateFieldType(obj, fieldName, 'number', objectName); - if (fieldName in obj && obj[fieldName] !== undefined && !(obj[fieldName] >= 0 && obj[fieldName] <= 100)) { + if ( + fieldName in obj && + obj[fieldName] !== undefined && + !(obj[fieldName] >= 0 && obj[fieldName] <= 100) + ) { throw new Error( `outlier detection config ${fullFieldName} parse error: value out of range for percentage (0-100)` ); @@ -175,9 +185,7 @@ export class OutlierDetectionLoadBalancingConfig failurePercentageEjection: Partial | null, private readonly childPolicy: TypedLoadBalancingConfig ) { - if ( - childPolicy.getLoadBalancerName() === 'pick_first' - ) { + if (childPolicy.getLoadBalancerName() === 'pick_first') { throw new Error( 'outlier_detection LB policy cannot have a pick_first child policy' ); @@ -207,9 +215,10 @@ export class OutlierDetectionLoadBalancingConfig max_ejection_time: msToDuration(this.maxEjectionTimeMs), max_ejection_percent: this.maxEjectionPercent, success_rate_ejection: this.successRateEjection ?? undefined, - failure_percentage_ejection: this.failurePercentageEjection ?? undefined, - child_policy: [this.childPolicy.toJsonObject()] - } + failure_percentage_ejection: + this.failurePercentageEjection ?? undefined, + child_policy: [this.childPolicy.toJsonObject()], + }, }; } @@ -240,7 +249,10 @@ export class OutlierDetectionLoadBalancingConfig validatePositiveDuration(obj, 'base_ejection_time'); validatePositiveDuration(obj, 'max_ejection_time'); validatePercentage(obj, 'max_ejection_percent'); - if ('success_rate_ejection' in obj && obj.success_rate_ejection !== undefined) { + if ( + 'success_rate_ejection' in obj && + obj.success_rate_ejection !== undefined + ) { if (typeof obj.success_rate_ejection !== 'object') { throw new Error( 'outlier detection config success_rate_ejection must be an object' @@ -270,7 +282,10 @@ export class OutlierDetectionLoadBalancingConfig 'success_rate_ejection' ); } - if ('failure_percentage_ejection' in obj && obj.failure_percentage_ejection !== undefined) { + if ( + 'failure_percentage_ejection' in obj && + obj.failure_percentage_ejection !== undefined + ) { if (typeof obj.failure_percentage_ejection !== 'object') { throw new Error( 'outlier detection config failure_percentage_ejection must be an object' @@ -305,7 +320,9 @@ export class OutlierDetectionLoadBalancingConfig } const childPolicy = selectLbConfigFromList(obj.child_policy); if (!childPolicy) { - throw new Error('outlier detection config child_policy: no valid recognized policy found'); + throw new Error( + 'outlier detection config child_policy: no valid recognized policy found' + ); } return new OutlierDetectionLoadBalancingConfig( @@ -324,55 +341,12 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface { - private childSubchannelState: ConnectivityState; - private stateListeners: ConnectivityStateListener[] = []; - private ejected = false; private refCount = 0; constructor( childSubchannel: SubchannelInterface, private mapEntry?: MapEntry ) { super(childSubchannel); - this.childSubchannelState = childSubchannel.getConnectivityState(); - childSubchannel.addConnectivityStateListener( - (subchannel, previousState, newState, keepaliveTime) => { - this.childSubchannelState = newState; - if (!this.ejected) { - for (const listener of this.stateListeners) { - listener(this, previousState, newState, keepaliveTime); - } - } - } - ); - } - - getConnectivityState(): ConnectivityState { - if (this.ejected) { - return ConnectivityState.TRANSIENT_FAILURE; - } else { - return this.childSubchannelState; - } - } - - /** - * Add a listener function to be called whenever the wrapper's - * connectivity state changes. - * @param listener - */ - addConnectivityStateListener(listener: ConnectivityStateListener) { - this.stateListeners.push(listener); - } - - /** - * Remove a listener previously added with `addConnectivityStateListener` - * @param listener A reference to a function previously passed to - * `addConnectivityStateListener` - */ - removeConnectivityStateListener(listener: ConnectivityStateListener) { - const listenerIndex = this.stateListeners.indexOf(listener); - if (listenerIndex > -1) { - this.stateListeners.splice(listenerIndex, 1); - } } ref() { @@ -394,27 +368,11 @@ class OutlierDetectionSubchannelWrapper } eject() { - this.ejected = true; - for (const listener of this.stateListeners) { - listener( - this, - this.childSubchannelState, - ConnectivityState.TRANSIENT_FAILURE, - -1 - ); - } + this.setHealthy(false); } uneject() { - this.ejected = false; - for (const listener of this.stateListeners) { - listener( - this, - ConnectivityState.TRANSIENT_FAILURE, - this.childSubchannelState, - -1 - ); - } + this.setHealthy(true); } getMapEntry(): MapEntry | undefined { @@ -459,13 +417,6 @@ class CallCounter { } } -interface MapEntry { - counter: CallCounter; - currentEjectionTimestamp: Date | null; - ejectionTimeMultiplier: number; - subchannelWrappers: OutlierDetectionSubchannelWrapper[]; -} - class OutlierDetectionPicker implements Picker { constructor(private wrappedPicker: Picker, private countCalls: boolean) {} pick(pickArgs: PickArgs): PickResult { @@ -503,9 +454,133 @@ class OutlierDetectionPicker implements Picker { } } +interface MapEntry { + counter: CallCounter; + currentEjectionTimestamp: Date | null; + ejectionTimeMultiplier: number; + subchannelWrappers: OutlierDetectionSubchannelWrapper[]; +} + +interface EndpointMapEntry { + key: Endpoint; + value: MapEntry; +} + +function endpointEqualUnordered( + endpoint1: Endpoint, + endpoint2: Endpoint +): boolean { + if (endpoint1.addresses.length !== endpoint2.addresses.length) { + return false; + } + for (const address1 of endpoint1.addresses) { + let matchFound = false; + for (const address2 of endpoint2.addresses) { + if (subchannelAddressEqual(address1, address2)) { + matchFound = true; + break; + } + } + if (!matchFound) { + return false; + } + } + return true; +} + +class EndpointMap { + private map: Set = new Set(); + + get size() { + return this.map.size; + } + + getForSubchannelAddress(address: SubchannelAddress): MapEntry | undefined { + for (const entry of this.map) { + if (endpointHasAddress(entry.key, address)) { + return entry.value; + } + } + return undefined; + } + + /** + * Delete any entries in this map with keys that are not in endpoints + * @param endpoints + */ + deleteMissing(endpoints: Endpoint[]) { + for (const entry of this.map) { + let foundEntry = false; + for (const endpoint of endpoints) { + if (endpointEqualUnordered(endpoint, entry.key)) { + foundEntry = true; + } + } + if (!foundEntry) { + this.map.delete(entry); + } + } + } + + get(endpoint: Endpoint): MapEntry | undefined { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + return entry.value; + } + } + return undefined; + } + + set(endpoint: Endpoint, mapEntry: MapEntry) { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + entry.value = mapEntry; + return; + } + } + this.map.add({ key: endpoint, value: mapEntry }); + } + + delete(endpoint: Endpoint) { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + this.map.delete(entry); + return; + } + } + } + + has(endpoint: Endpoint): boolean { + for (const entry of this.map) { + if (endpointEqualUnordered(endpoint, entry.key)) { + return true; + } + } + return false; + } + + *keys(): IterableIterator { + for (const entry of this.map) { + yield entry.key; + } + } + + *values(): IterableIterator { + for (const entry of this.map) { + yield entry.value; + } + } + + *entries(): IterableIterator<[Endpoint, MapEntry]> { + for (const entry of this.map) { + yield [entry.key, entry.value]; + } + } +} + export class OutlierDetectionLoadBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; - private addressMap: Map = new Map(); + private entryMap = new EndpointMap(); private latestConfig: OutlierDetectionLoadBalancingConfig | null = null; private ejectionTimer: NodeJS.Timeout; private timerStartTime: Date | null = null; @@ -521,9 +596,8 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { subchannelAddress, subchannelArgs ); - const mapEntry = this.addressMap.get( - subchannelAddressToString(subchannelAddress) - ); + const mapEntry = + this.entryMap.getForSubchannelAddress(subchannelAddress); const subchannelWrapper = new OutlierDetectionSubchannelWrapper( originalSubchannel, mapEntry @@ -561,12 +635,12 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { private getCurrentEjectionPercent() { let ejectionCount = 0; - for (const mapEntry of this.addressMap.values()) { + for (const mapEntry of this.entryMap.values()) { if (mapEntry.currentEjectionTimestamp !== null) { ejectionCount += 1; } } - return (ejectionCount * 100) / this.addressMap.size; + return (ejectionCount * 100) / this.entryMap.size; } private runSuccessRateCheck(ejectionTimestamp: Date) { @@ -582,12 +656,12 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { const targetRequestVolume = successRateConfig.request_volume; let addresesWithTargetVolume = 0; const successRates: number[] = []; - for (const [address, mapEntry] of this.addressMap) { + for (const [endpoint, mapEntry] of this.entryMap.entries()) { const successes = mapEntry.counter.getLastSuccesses(); const failures = mapEntry.counter.getLastFailures(); trace( 'Stats for ' + - address + + endpointToString(endpoint) + ': successes=' + successes + ' failures=' + @@ -631,7 +705,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { ); // Step 3 - for (const [address, mapEntry] of this.addressMap.entries()) { + for (const [address, mapEntry] of this.entryMap.entries()) { // Step 3.i if ( this.getCurrentEjectionPercent() >= @@ -683,7 +757,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { ); // Step 1 let addressesWithTargetVolume = 0; - for (const mapEntry of this.addressMap.values()) { + for (const mapEntry of this.entryMap.values()) { const successes = mapEntry.counter.getLastSuccesses(); const failures = mapEntry.counter.getLastFailures(); if (successes + failures >= failurePercentageConfig.request_volume) { @@ -695,7 +769,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { } // Step 2 - for (const [address, mapEntry] of this.addressMap.entries()) { + for (const [address, mapEntry] of this.entryMap.entries()) { // Step 2.i if ( this.getCurrentEjectionPercent() >= @@ -746,7 +820,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { } private switchAllBuckets() { - for (const mapEntry of this.addressMap.values()) { + for (const mapEntry of this.entryMap.values()) { mapEntry.counter.switchBuckets(); } } @@ -771,7 +845,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { this.runSuccessRateCheck(ejectionTimestamp); this.runFailurePercentageCheck(ejectionTimestamp); - for (const [address, mapEntry] of this.addressMap.entries()) { + for (const [address, mapEntry] of this.entryMap.entries()) { if (mapEntry.currentEjectionTimestamp === null) { if (mapEntry.ejectionTimeMultiplier > 0) { mapEntry.ejectionTimeMultiplier -= 1; @@ -798,21 +872,17 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { } updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown } ): void { if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) { return; } - const subchannelAddresses = new Set(); - for (const address of addressList) { - subchannelAddresses.add(subchannelAddressToString(address)); - } - for (const address of subchannelAddresses) { - if (!this.addressMap.has(address)) { - trace('Adding map entry for ' + address); - this.addressMap.set(address, { + for (const endpoint of endpointList) { + if (!this.entryMap.has(endpoint)) { + trace('Adding map entry for ' + endpointToString(endpoint)); + this.entryMap.set(endpoint, { counter: new CallCounter(), currentEjectionTimestamp: null, ejectionTimeMultiplier: 0, @@ -820,14 +890,9 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { }); } } - for (const key of this.addressMap.keys()) { - if (!subchannelAddresses.has(key)) { - trace('Removing map entry for ' + key); - this.addressMap.delete(key); - } - } + this.entryMap.deleteMissing(endpointList); const childPolicy = lbConfig.getChildPolicy(); - this.childBalancer.updateAddressList(addressList, childPolicy, attributes); + this.childBalancer.updateAddressList(endpointList, childPolicy, attributes); if ( lbConfig.getSuccessRateEjectionConfig() || @@ -850,7 +915,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer { trace('Counting disabled. Cancelling timer.'); this.timerStartTime = null; clearTimeout(this.ejectionTimer); - for (const mapEntry of this.addressMap.values()) { + for (const mapEntry of this.entryMap.values()) { this.uneject(mapEntry); mapEntry.ejectionTimeMultiplier = 0; } diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 8635482c..7d394821 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -21,6 +21,7 @@ import { TypedLoadBalancingConfig, registerDefaultLoadBalancerType, registerLoadBalancerType, + createChildChannelControlHelper, } from './load-balancer'; import { ConnectivityState } from './connectivity-state'; import { @@ -31,13 +32,16 @@ import { PickResultType, UnavailablePicker, } from './picker'; -import { SubchannelAddress } from './subchannel-address'; +import { Endpoint, SubchannelAddress } from './subchannel-address'; import * as logging from './logging'; import { LogVerbosity } from './constants'; import { SubchannelInterface, ConnectivityStateListener, + HealthListener, } from './subchannel-interface'; +import { isTcpSubchannelAddress } from './subchannel-address'; +import { isIPv6 } from 'net'; const TRACER_NAME = 'pick_first'; @@ -125,6 +129,39 @@ export function shuffled(list: T[]): T[] { return result; } +/** + * Interleave addresses in addressList by family in accordance with RFC-8304 section 4 + * @param addressList + * @returns + */ +function interleaveAddressFamilies( + addressList: SubchannelAddress[] +): SubchannelAddress[] { + const result: SubchannelAddress[] = []; + const ipv6Addresses: SubchannelAddress[] = []; + const ipv4Addresses: SubchannelAddress[] = []; + const ipv6First = + isTcpSubchannelAddress(addressList[0]) && isIPv6(addressList[0].host); + for (const address of addressList) { + if (isTcpSubchannelAddress(address) && isIPv6(address.host)) { + ipv6Addresses.push(address); + } else { + ipv4Addresses.push(address); + } + } + const firstList = ipv6First ? ipv6Addresses : ipv4Addresses; + const secondList = ipv6First ? ipv4Addresses : ipv6Addresses; + for (let i = 0; i < Math.max(firstList.length, secondList.length); i++) { + if (i < firstList.length) { + result.push(firstList[i]); + } + if (i < secondList.length) { + result.push(secondList[i]); + } + } + return result; +} + export class PickFirstLoadBalancer implements LoadBalancer { /** * The list of subchannels this load balancer is currently attempting to @@ -157,6 +194,9 @@ export class PickFirstLoadBalancer implements LoadBalancer { ) => { this.onSubchannelStateUpdate(subchannel, previousState, newState); }; + + private pickedSubchannelHealthListener: HealthListener = () => + this.calculateAndReportNewState(); /** * Timer reference for the timer tracking when to start */ @@ -179,7 +219,10 @@ export class PickFirstLoadBalancer implements LoadBalancer { * @param channelControlHelper `ChannelControlHelper` instance provided by * this load balancer's owner. */ - constructor(private readonly channelControlHelper: ChannelControlHelper) { + constructor( + private readonly channelControlHelper: ChannelControlHelper, + private reportHealthStatus = false + ) { this.connectionDelayTimeout = setTimeout(() => {}, 0); clearTimeout(this.connectionDelayTimeout); } @@ -190,10 +233,19 @@ export class PickFirstLoadBalancer implements LoadBalancer { private calculateAndReportNewState() { if (this.currentPick) { - this.updateState( - ConnectivityState.READY, - new PickFirstPicker(this.currentPick) - ); + if (this.reportHealthStatus && !this.currentPick.isHealthy()) { + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker({ + details: `Picked subchannel ${this.currentPick.getAddress()} is unhealthy`, + }) + ); + } else { + this.updateState( + ConnectivityState.READY, + new PickFirstPicker(this.currentPick) + ); + } } else if (this.children.length === 0) { this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); } else { @@ -235,6 +287,11 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.channelControlHelper.removeChannelzChild( currentPick.getChannelzRef() ); + if (this.reportHealthStatus) { + currentPick.removeHealthStateWatcher( + this.pickedSubchannelHealthListener + ); + } } } @@ -306,7 +363,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.children[subchannelIndex].subchannel.getAddress() ); process.nextTick(() => { - this.children[subchannelIndex].subchannel.startConnecting(); + this.children[subchannelIndex]?.subchannel.startConnecting(); }); } this.connectionDelayTimeout = setTimeout(() => { @@ -320,17 +377,12 @@ export class PickFirstLoadBalancer implements LoadBalancer { } trace('Pick subchannel with address ' + subchannel.getAddress()); this.stickyTransientFailureMode = false; - if (this.currentPick !== null) { - this.currentPick.unref(); - this.channelControlHelper.removeChannelzChild( - this.currentPick.getChannelzRef() - ); - this.currentPick.removeConnectivityStateListener( - this.subchannelStateListener - ); - } + this.removeCurrentPick(); this.currentPick = subchannel; subchannel.ref(); + if (this.reportHealthStatus) { + subchannel.addHealthStateWatcher(this.pickedSubchannelHealthListener); + } this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); this.resetSubchannelList(); clearTimeout(this.connectionDelayTimeout); @@ -373,7 +425,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { } updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig ): void { if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) { @@ -383,8 +435,15 @@ export class PickFirstLoadBalancer implements LoadBalancer { * previous update, to minimize churn. Now the DNS resolver is * rate-limited, so that is less of a concern. */ if (lbConfig.getShuffleAddressList()) { - addressList = shuffled(addressList); + endpointList = shuffled(endpointList); } + const rawAddressList = ([] as SubchannelAddress[]).concat( + ...endpointList.map(endpoint => endpoint.addresses) + ); + if (rawAddressList.length === 0) { + throw new Error('No addresses in endpoint list passed to pick_first'); + } + const addressList = interleaveAddressFamilies(rawAddressList); const newChildrenList = addressList.map(address => ({ subchannel: this.channelControlHelper.createSubchannel(address, {}), hasReportedTransientFailure: false, @@ -438,6 +497,59 @@ export class PickFirstLoadBalancer implements LoadBalancer { } } +const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false); + +/** + * This class handles the leaf load balancing operations for a single endpoint. + * It is a thin wrapper around a PickFirstLoadBalancer with a different API + * that more closely reflects how it will be used as a leaf balancer. + */ +export class LeafLoadBalancer { + private pickFirstBalancer: PickFirstLoadBalancer; + private latestState: ConnectivityState = ConnectivityState.IDLE; + private latestPicker: Picker; + constructor( + private endpoint: Endpoint, + channelControlHelper: ChannelControlHelper + ) { + const childChannelControlHelper = createChildChannelControlHelper( + channelControlHelper, + { + updateState: (connectivityState, picker) => { + this.latestState = connectivityState; + this.latestPicker = picker; + channelControlHelper.updateState(connectivityState, picker); + }, + } + ); + this.pickFirstBalancer = new PickFirstLoadBalancer( + childChannelControlHelper, + /* reportHealthStatus= */ true + ); + this.latestPicker = new QueuePicker(this.pickFirstBalancer); + } + + startConnecting() { + this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG); + } + + getConnectivityState() { + return this.latestState; + } + + getPicker() { + return this.latestPicker; + } + + getEndpoint() { + return this.endpoint; + } + + destroy() { + this.pickFirstBalancer.destroy(); + } +} + export function setup(): void { registerLoadBalancerType( TYPE_NAME, diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index a611cfd6..986181a8 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -20,26 +20,24 @@ import { ChannelControlHelper, TypedLoadBalancingConfig, registerLoadBalancerType, + createChildChannelControlHelper, } from './load-balancer'; import { ConnectivityState } from './connectivity-state'; import { QueuePicker, Picker, PickArgs, - CompletePickResult, - PickResultType, UnavailablePicker, + PickResult, } from './picker'; -import { - SubchannelAddress, - subchannelAddressToString, -} from './subchannel-address'; import * as logging from './logging'; import { LogVerbosity } from './constants'; import { - ConnectivityStateListener, - SubchannelInterface, -} from './subchannel-interface'; + Endpoint, + endpointEqual, + endpointToString, +} from './subchannel-address'; +import { LeafLoadBalancer } from './load-balancer-pick-first'; const TRACER_NAME = 'round_robin'; @@ -70,20 +68,14 @@ class RoundRobinLoadBalancingConfig implements TypedLoadBalancingConfig { class RoundRobinPicker implements Picker { constructor( - private readonly subchannelList: SubchannelInterface[], + private readonly children: { endpoint: Endpoint; picker: Picker }[], private nextIndex = 0 ) {} - pick(pickArgs: PickArgs): CompletePickResult { - const pickedSubchannel = this.subchannelList[this.nextIndex]; - this.nextIndex = (this.nextIndex + 1) % this.subchannelList.length; - return { - pickResultType: PickResultType.COMPLETE, - subchannel: pickedSubchannel, - status: null, - onCallStarted: null, - onCallEnded: null, - }; + pick(pickArgs: PickArgs): PickResult { + const childPicker = this.children[this.nextIndex].picker; + this.nextIndex = (this.nextIndex + 1) % this.children.length; + return childPicker.pick(pickArgs); } /** @@ -91,54 +83,51 @@ class RoundRobinPicker implements Picker { * balancer implementation to preserve this part of the picker state if * possible when a subchannel connects or disconnects. */ - peekNextSubchannel(): SubchannelInterface { - return this.subchannelList[this.nextIndex]; + peekNextEndpoint(): Endpoint { + return this.children[this.nextIndex].endpoint; } } export class RoundRobinLoadBalancer implements LoadBalancer { - private subchannels: SubchannelInterface[] = []; + private children: LeafLoadBalancer[] = []; private currentState: ConnectivityState = ConnectivityState.IDLE; - private subchannelStateListener: ConnectivityStateListener; - private currentReadyPicker: RoundRobinPicker | null = null; - constructor(private readonly channelControlHelper: ChannelControlHelper) { - this.subchannelStateListener = ( - subchannel: SubchannelInterface, - previousState: ConnectivityState, - newState: ConnectivityState - ) => { - this.calculateAndUpdateState(); + private updatesPaused = false; - if ( - newState === ConnectivityState.TRANSIENT_FAILURE || - newState === ConnectivityState.IDLE - ) { - this.channelControlHelper.requestReresolution(); - subchannel.startConnecting(); + private childChannelControlHelper: ChannelControlHelper; + + constructor(private readonly channelControlHelper: ChannelControlHelper) { + this.childChannelControlHelper = createChildChannelControlHelper( + channelControlHelper, + { + updateState: (connectivityState, picker) => { + this.calculateAndUpdateState(); + }, } - }; + ); } - private countSubchannelsWithState(state: ConnectivityState) { - return this.subchannels.filter( - subchannel => subchannel.getConnectivityState() === state - ).length; + private countChildrenWithState(state: ConnectivityState) { + return this.children.filter(child => child.getConnectivityState() === state) + .length; } private calculateAndUpdateState() { - if (this.countSubchannelsWithState(ConnectivityState.READY) > 0) { - const readySubchannels = this.subchannels.filter( - subchannel => - subchannel.getConnectivityState() === ConnectivityState.READY + if (this.updatesPaused) { + return; + } + if (this.countChildrenWithState(ConnectivityState.READY) > 0) { + const readyChildren = this.children.filter( + child => child.getConnectivityState() === ConnectivityState.READY ); let index = 0; if (this.currentReadyPicker !== null) { - index = readySubchannels.indexOf( - this.currentReadyPicker.peekNextSubchannel() + const nextPickedEndpoint = this.currentReadyPicker.peekNextEndpoint(); + index = readyChildren.findIndex(child => + endpointEqual(child.getEndpoint(), nextPickedEndpoint) ); if (index < 0) { index = 0; @@ -146,14 +135,18 @@ export class RoundRobinLoadBalancer implements LoadBalancer { } this.updateState( ConnectivityState.READY, - new RoundRobinPicker(readySubchannels, index) + new RoundRobinPicker( + readyChildren.map(child => ({ + endpoint: child.getEndpoint(), + picker: child.getPicker(), + })), + index + ) ); - } else if ( - this.countSubchannelsWithState(ConnectivityState.CONNECTING) > 0 - ) { + } else if (this.countChildrenWithState(ConnectivityState.CONNECTING) > 0) { this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); } else if ( - this.countSubchannelsWithState(ConnectivityState.TRANSIENT_FAILURE) > 0 + this.countChildrenWithState(ConnectivityState.TRANSIENT_FAILURE) > 0 ) { this.updateState( ConnectivityState.TRANSIENT_FAILURE, @@ -180,51 +173,35 @@ export class RoundRobinLoadBalancer implements LoadBalancer { } private resetSubchannelList() { - for (const subchannel of this.subchannels) { - subchannel.removeConnectivityStateListener(this.subchannelStateListener); - subchannel.unref(); - this.channelControlHelper.removeChannelzChild( - subchannel.getChannelzRef() - ); + for (const child of this.children) { + child.destroy(); } - this.subchannels = []; } updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig ): void { this.resetSubchannelList(); - trace( - 'Connect to address list ' + - addressList.map(address => subchannelAddressToString(address)) + trace('Connect to endpoint list ' + endpointList.map(endpointToString)); + this.updatesPaused = true; + this.children = endpointList.map( + endpoint => new LeafLoadBalancer(endpoint, this.childChannelControlHelper) ); - this.subchannels = addressList.map(address => - this.channelControlHelper.createSubchannel(address, {}) - ); - for (const subchannel of this.subchannels) { - subchannel.ref(); - subchannel.addConnectivityStateListener(this.subchannelStateListener); - this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); - const subchannelState = subchannel.getConnectivityState(); - if ( - subchannelState === ConnectivityState.IDLE || - subchannelState === ConnectivityState.TRANSIENT_FAILURE - ) { - subchannel.startConnecting(); - } + for (const child of this.children) { + child.startConnecting(); } + this.updatesPaused = false; this.calculateAndUpdateState(); } exitIdle(): void { - for (const subchannel of this.subchannels) { - subchannel.startConnecting(); - } + /* The round_robin LB policy is only in the IDLE state if it has no + * addresses to try to connect to and it has no picked subchannel. + * In that case, there is no meaningful action that can be taken here. */ } resetBackoff(): void { - /* The pick first load balancer does not have a connection backoff, so this - * does nothing */ + // This LB policy has no backoff to reset } destroy(): void { this.resetSubchannelList(); diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index d5d69543..1145fdc9 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -16,7 +16,7 @@ */ import { ChannelOptions } from './channel-options'; -import { SubchannelAddress } from './subchannel-address'; +import { Endpoint, SubchannelAddress } from './subchannel-address'; import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; import { ChannelRef, SubchannelRef } from './channelz'; @@ -95,12 +95,12 @@ export interface LoadBalancer { * The load balancer will start establishing connections with the new list, * but will continue using any existing connections until the new connections * are established - * @param addressList The new list of addresses to connect to + * @param endpointList The new list of addresses to connect to * @param lbConfig The load balancing config object from the service config, * if one was provided */ updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown } ): void; @@ -185,7 +185,9 @@ export function isLoadBalancerNameRegistered(typeName: string): boolean { return typeName in registeredLoadBalancerTypes; } -export function parseLoadBalancingConfig(rawConfig: LoadBalancingConfig): TypedLoadBalancingConfig { +export function parseLoadBalancingConfig( + rawConfig: LoadBalancingConfig +): TypedLoadBalancingConfig { const keys = Object.keys(rawConfig); if (keys.length !== 1) { throw new Error( @@ -210,7 +212,9 @@ export function getDefaultConfig() { if (!defaultLoadBalancerType) { throw new Error('No default load balancer type registered'); } - return new registeredLoadBalancerTypes[defaultLoadBalancerType]!.LoadBalancingConfig(); + return new registeredLoadBalancerTypes[ + defaultLoadBalancerType + ]!.LoadBalancingConfig(); } export function selectLbConfigFromList( @@ -221,7 +225,11 @@ export function selectLbConfigFromList( try { return parseLoadBalancingConfig(config); } catch (e) { - log(LogVerbosity.DEBUG, 'Config parsing failed with error', (e as Error).message); + log( + LogVerbosity.DEBUG, + 'Config parsing failed with error', + (e as Error).message + ); continue; } } diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index d95eca21..6474269f 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -97,16 +97,13 @@ export interface Picker { */ export class UnavailablePicker implements Picker { private status: StatusObject; - constructor(status?: StatusObject) { - if (status !== undefined) { - this.status = status; - } else { - this.status = { - code: Status.UNAVAILABLE, - details: 'No connection established', - metadata: new Metadata(), - }; - } + constructor(status?: Partial) { + this.status = { + code: Status.UNAVAILABLE, + details: 'No connection established', + metadata: new Metadata(), + ...status, + }; } pick(pickArgs: PickArgs): TransientFailurePickResult { return { diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index c40cb8ec..0956b460 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -28,7 +28,7 @@ import { StatusObject } from './call-interface'; import { Metadata } from './metadata'; import * as logging from './logging'; import { LogVerbosity } from './constants'; -import { SubchannelAddress, TcpSubchannelAddress } from './subchannel-address'; +import { Endpoint, TcpSubchannelAddress } from './subchannel-address'; import { GrpcUri, uriToString, splitHostPort } from './uri-parser'; import { isIPv6, isIPv4 } from 'net'; import { ChannelOptions } from './channel-options'; @@ -50,35 +50,11 @@ const DEFAULT_MIN_TIME_BETWEEN_RESOLUTIONS_MS = 30_000; const resolveTxtPromise = util.promisify(dns.resolveTxt); const dnsLookupPromise = util.promisify(dns.lookup); -/** - * Merge any number of arrays into a single alternating array - * @param arrays - */ -function mergeArrays(...arrays: T[][]): T[] { - const result: T[] = []; - for ( - let i = 0; - i < - Math.max.apply( - null, - arrays.map(array => array.length) - ); - i++ - ) { - for (const array of arrays) { - if (i < array.length) { - result.push(array[i]); - } - } - } - return result; -} - /** * Resolver implementation that handles DNS names and IP addresses. */ class DnsResolver implements Resolver { - private readonly ipResult: SubchannelAddress[] | null; + private readonly ipResult: Endpoint[] | null; private readonly dnsHostname: string | null; private readonly port: number | null; /** @@ -89,7 +65,7 @@ class DnsResolver implements Resolver { private readonly minTimeBetweenResolutionsMs: number; private pendingLookupPromise: Promise | null = null; private pendingTxtPromise: Promise | null = null; - private latestLookupResult: TcpSubchannelAddress[] | null = null; + private latestLookupResult: Endpoint[] | null = null; private latestServiceConfig: ServiceConfig | null = null; private latestServiceConfigError: StatusObject | null = null; private percentage: number; @@ -114,8 +90,12 @@ class DnsResolver implements Resolver { if (isIPv4(hostPort.host) || isIPv6(hostPort.host)) { this.ipResult = [ { - host: hostPort.host, - port: hostPort.port ?? DEFAULT_PORT, + addresses: [ + { + host: hostPort.host, + port: hostPort.port ?? DEFAULT_PORT, + }, + ], }, ]; this.dnsHostname = null; @@ -213,18 +193,15 @@ class DnsResolver implements Resolver { this.pendingLookupPromise = null; this.backoff.reset(); this.backoff.stop(); - const ip4Addresses: dns.LookupAddress[] = addressList.filter( - addr => addr.family === 4 - ); - const ip6Addresses: dns.LookupAddress[] = addressList.filter( - addr => addr.family === 6 - ); - this.latestLookupResult = mergeArrays(ip6Addresses, ip4Addresses).map( + const subchannelAddresses: TcpSubchannelAddress[] = addressList.map( addr => ({ host: addr.address, port: +this.port! }) ); + this.latestLookupResult = subchannelAddresses.map(address => ({ + addresses: [address], + })); const allAddressesString: string = '[' + - this.latestLookupResult + subchannelAddresses .map(addr => addr.host + ':' + addr.port) .join(',') + ']'; diff --git a/packages/grpc-js/src/resolver-ip.ts b/packages/grpc-js/src/resolver-ip.ts index 0704131e..cda35d3b 100644 --- a/packages/grpc-js/src/resolver-ip.ts +++ b/packages/grpc-js/src/resolver-ip.ts @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options'; import { LogVerbosity, Status } from './constants'; import { Metadata } from './metadata'; import { registerResolver, Resolver, ResolverListener } from './resolver'; -import { SubchannelAddress } from './subchannel-address'; +import { Endpoint, SubchannelAddress } from './subchannel-address'; import { GrpcUri, splitHostPort, uriToString } from './uri-parser'; import * as logging from './logging'; @@ -39,7 +39,7 @@ const IPV6_SCHEME = 'ipv6'; const DEFAULT_PORT = 443; class IpResolver implements Resolver { - private addresses: SubchannelAddress[] = []; + private endpoints: Endpoint[] = []; private error: StatusObject | null = null; constructor( target: GrpcUri, @@ -83,8 +83,8 @@ class IpResolver implements Resolver { port: hostPort.port ?? DEFAULT_PORT, }); } - this.addresses = addresses; - trace('Parsed ' + target.scheme + ' address list ' + this.addresses); + this.endpoints = addresses.map(address => ({ addresses: [address] })); + trace('Parsed ' + target.scheme + ' address list ' + addresses); } updateResolution(): void { process.nextTick(() => { @@ -92,7 +92,7 @@ class IpResolver implements Resolver { this.listener.onError(this.error); } else { this.listener.onSuccessfulResolution( - this.addresses, + this.endpoints, null, null, null, diff --git a/packages/grpc-js/src/resolver-uds.ts b/packages/grpc-js/src/resolver-uds.ts index 24095ec2..4fa1944b 100644 --- a/packages/grpc-js/src/resolver-uds.ts +++ b/packages/grpc-js/src/resolver-uds.ts @@ -15,12 +15,12 @@ */ import { Resolver, ResolverListener, registerResolver } from './resolver'; -import { SubchannelAddress } from './subchannel-address'; +import { Endpoint } from './subchannel-address'; import { GrpcUri } from './uri-parser'; import { ChannelOptions } from './channel-options'; class UdsResolver implements Resolver { - private addresses: SubchannelAddress[] = []; + private endpoints: Endpoint[] = []; constructor( target: GrpcUri, private listener: ResolverListener, @@ -32,12 +32,12 @@ class UdsResolver implements Resolver { } else { path = target.path; } - this.addresses = [{ path }]; + this.endpoints = [{ addresses: [{ path }] }]; } updateResolution(): void { process.nextTick( this.listener.onSuccessfulResolution, - this.addresses, + this.endpoints, null, null, null, diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 43508625..4dfa8d13 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -17,7 +17,7 @@ import { MethodConfig, ServiceConfig } from './service-config'; import { StatusObject } from './call-interface'; -import { SubchannelAddress } from './subchannel-address'; +import { Endpoint } from './subchannel-address'; import { GrpcUri, uriToString } from './uri-parser'; import { ChannelOptions } from './channel-options'; import { Metadata } from './metadata'; @@ -55,7 +55,7 @@ export interface ResolverListener { * service configuration was invalid */ onSuccessfulResolution( - addressList: SubchannelAddress[], + addressList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null, configSelector: ConfigSelector | null, diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index e2b2c1fa..22808dc3 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -32,7 +32,7 @@ import { StatusObject } from './call-interface'; import { Metadata } from './metadata'; import * as logging from './logging'; import { LogVerbosity } from './constants'; -import { SubchannelAddress } from './subchannel-address'; +import { Endpoint } from './subchannel-address'; import { GrpcUri, uriToString } from './uri-parser'; import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; import { ChannelOptions } from './channel-options'; @@ -177,7 +177,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { target, { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null, configSelector: ConfigSelector | null, @@ -226,7 +226,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { return; } this.childLoadBalancer.updateAddressList( - addressList, + endpointList, loadBalancingConfig, attributes ); @@ -307,7 +307,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { } updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig | null ): never { throw new Error('updateAddressList not supported on ResolvingLoadBalancer'); diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index c9308ca6..4099f135 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -631,12 +631,15 @@ export class Server { const resolverListener: ResolverListener = { onSuccessfulResolution: ( - addressList, + endpointList, serviceConfig, serviceConfigError ) => { // We only want one resolution result. Discard all future results resolverListener.onSuccessfulResolution = () => {}; + const addressList = ([] as SubchannelAddress[]).concat( + ...endpointList.map(endpoint => endpoint.addresses) + ); if (addressList.length === 0) { deferredCallback( new Error(`No addresses resolved for port ${port}`), diff --git a/packages/grpc-js/src/subchannel-address.ts b/packages/grpc-js/src/subchannel-address.ts index 1ab88f45..36fd99ea 100644 --- a/packages/grpc-js/src/subchannel-address.ts +++ b/packages/grpc-js/src/subchannel-address.ts @@ -86,3 +86,39 @@ export function stringToSubchannelAddress( }; } } + +export interface Endpoint { + addresses: SubchannelAddress[]; +} + +export function endpointEqual(endpoint1: Endpoint, endpoint2: Endpoint) { + if (endpoint1.addresses.length !== endpoint2.addresses.length) { + return false; + } + for (let i = 0; i < endpoint1.addresses.length; i++) { + if ( + !subchannelAddressEqual(endpoint1.addresses[i], endpoint2.addresses[i]) + ) { + return false; + } + } + return true; +} + +export function endpointToString(endpoint: Endpoint): string { + return ( + '[' + endpoint.addresses.map(subchannelAddressToString).join(', ') + ']' + ); +} + +export function endpointHasAddress( + endpoint: Endpoint, + expectedAddress: SubchannelAddress +): boolean { + for (const address of endpoint.addresses) { + if (subchannelAddressEqual(address, expectedAddress)) { + return true; + } + } + return false; +} diff --git a/packages/grpc-js/src/subchannel-interface.ts b/packages/grpc-js/src/subchannel-interface.ts index 9b947ad3..c7fdca4f 100644 --- a/packages/grpc-js/src/subchannel-interface.ts +++ b/packages/grpc-js/src/subchannel-interface.ts @@ -26,6 +26,8 @@ export type ConnectivityStateListener = ( keepaliveTime: number ) => void; +export type HealthListener = (healthy: boolean) => void; + /** * This is an interface for load balancing policies to use to interact with * subchannels. This allows load balancing policies to wrap and unwrap @@ -45,6 +47,9 @@ export interface SubchannelInterface { ref(): void; unref(): void; getChannelzRef(): SubchannelRef; + isHealthy(): boolean; + addHealthStateWatcher(listener: HealthListener): void; + removeHealthStateWatcher(listener: HealthListener): void; /** * If this is a wrapper, return the wrapped subchannel, otherwise return this */ @@ -58,7 +63,23 @@ export interface SubchannelInterface { } export abstract class BaseSubchannelWrapper implements SubchannelInterface { - constructor(protected child: SubchannelInterface) {} + private healthy = true; + private healthListeners: Set = new Set(); + constructor(protected child: SubchannelInterface) { + child.addHealthStateWatcher(childHealthy => { + /* A change to the child health state only affects this wrapper's overall + * health state if this wrapper is reporting healthy. */ + if (this.healthy) { + this.updateHealthListeners(); + } + }); + } + + private updateHealthListeners(): void { + for (const listener of this.healthListeners) { + listener(this.isHealthy()); + } + } getConnectivityState(): ConnectivityState { return this.child.getConnectivityState(); @@ -87,6 +108,25 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface { getChannelzRef(): SubchannelRef { return this.child.getChannelzRef(); } + isHealthy(): boolean { + return this.healthy && this.child.isHealthy(); + } + addHealthStateWatcher(listener: HealthListener): void { + this.healthListeners.add(listener); + } + removeHealthStateWatcher(listener: HealthListener): void { + this.healthListeners.delete(listener); + } + protected setHealthy(healthy: boolean): void { + if (healthy !== this.healthy) { + this.healthy = healthy; + /* A change to this wrapper's health state only affects the overall + * reported health state if the child is healthy. */ + if (this.child.isHealthy()) { + this.updateHealthListeners(); + } + } + } getRealSubchannel(): Subchannel { return this.child.getRealSubchannel(); } diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 6fad9500..cf49ced7 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -461,6 +461,18 @@ export class Subchannel { return this.channelzRef; } + isHealthy(): boolean { + return true; + } + + addHealthStateWatcher(listener: (healthy: boolean) => void): void { + // Do nothing with the listener + } + + removeHealthStateWatcher(listener: (healthy: boolean) => void): void { + // Do nothing with the listener + } + getRealSubchannel(): this { return this; } diff --git a/packages/grpc-js/test/common.ts b/packages/grpc-js/test/common.ts index d15a9d5e..20352fb2 100644 --- a/packages/grpc-js/test/common.ts +++ b/packages/grpc-js/test/common.ts @@ -27,7 +27,10 @@ import { loadPackageDefinition, } from '../src/make-client'; import { readFileSync } from 'fs'; -import { SubchannelInterface } from '../src/subchannel-interface'; +import { + HealthListener, + SubchannelInterface, +} from '../src/subchannel-interface'; import { SubchannelRef } from '../src/channelz'; import { Subchannel } from '../src/subchannel'; import { ConnectivityState } from '../src/connectivity-state'; @@ -198,6 +201,11 @@ export class MockSubchannel implements SubchannelInterface { realSubchannelEquals(other: grpc.experimental.SubchannelInterface): boolean { return this === other; } + isHealthy(): boolean { + return true; + } + addHealthStateWatcher(listener: HealthListener): void {} + removeHealthStateWatcher(listener: HealthListener): void {} } export { assert2 }; diff --git a/packages/grpc-js/test/test-pick-first.ts b/packages/grpc-js/test/test-pick-first.ts index e9e9e560..7e862de5 100644 --- a/packages/grpc-js/test/test-pick-first.ts +++ b/packages/grpc-js/test/test-pick-first.ts @@ -29,10 +29,7 @@ import { } from '../src/load-balancer-pick-first'; import { Metadata } from '../src/metadata'; import { Picker } from '../src/picker'; -import { - SubchannelAddress, - subchannelAddressToString, -} from '../src/subchannel-address'; +import { Endpoint, subchannelAddressToString } from '../src/subchannel-address'; import { MockSubchannel, TestClient, TestServer } from './common'; function updateStateCallBackForExpectedStateSequence( @@ -120,7 +117,10 @@ describe('pick_first load balancing policy', () => { } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); - pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 1 }] }], + config + ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.READY); }); @@ -144,7 +144,10 @@ describe('pick_first load balancing policy', () => { } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); - pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 1 }] }], + config + ); }); it('Should stay CONNECTING if only some subchannels fail to connect', done => { const channelControlHelper = createChildChannelControlHelper( @@ -159,8 +162,8 @@ describe('pick_first load balancing policy', () => { const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ - { host: 'localhost', port: 1 }, - { host: 'localhost', port: 2 }, + { addresses: [{ host: 'localhost', port: 1 }] }, + { addresses: [{ host: 'localhost', port: 2 }] }, ], config ); @@ -181,8 +184,8 @@ describe('pick_first load balancing policy', () => { const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ - { host: 'localhost', port: 1 }, - { host: 'localhost', port: 2 }, + { addresses: [{ host: 'localhost', port: 1 }] }, + { addresses: [{ host: 'localhost', port: 2 }] }, ], config ); @@ -206,8 +209,8 @@ describe('pick_first load balancing policy', () => { const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ - { host: 'localhost', port: 1 }, - { host: 'localhost', port: 2 }, + { addresses: [{ host: 'localhost', port: 1 }] }, + { addresses: [{ host: 'localhost', port: 2 }] }, ], config ); @@ -245,8 +248,8 @@ describe('pick_first load balancing policy', () => { const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ - { host: 'localhost', port: 1 }, - { host: 'localhost', port: 2 }, + { addresses: [{ host: 'localhost', port: 1 }] }, + { addresses: [{ host: 'localhost', port: 2 }] }, ], config ); @@ -272,8 +275,8 @@ describe('pick_first load balancing policy', () => { const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ - { host: 'localhost', port: 1 }, - { host: 'localhost', port: 2 }, + { addresses: [{ host: 'localhost', port: 1 }] }, + { addresses: [{ host: 'localhost', port: 2 }] }, ], config ); @@ -303,8 +306,8 @@ describe('pick_first load balancing policy', () => { const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ - { host: 'localhost', port: 1 }, - { host: 'localhost', port: 2 }, + { addresses: [{ host: 'localhost', port: 1 }] }, + { addresses: [{ host: 'localhost', port: 2 }] }, ], config ); @@ -312,8 +315,8 @@ describe('pick_first load balancing policy', () => { currentStartState = ConnectivityState.CONNECTING; pickFirst.updateAddressList( [ - { host: 'localhost', port: 3 }, - { host: 'localhost', port: 4 }, + { addresses: [{ host: 'localhost', port: 1 }] }, + { addresses: [{ host: 'localhost', port: 2 }] }, ], config ); @@ -341,14 +344,17 @@ describe('pick_first load balancing policy', () => { const pickFirst = new PickFirstLoadBalancer(channelControlHelper); pickFirst.updateAddressList( [ - { host: 'localhost', port: 1 }, - { host: 'localhost', port: 2 }, + { addresses: [{ host: 'localhost', port: 1 }] }, + { addresses: [{ host: 'localhost', port: 2 }] }, ], config ); process.nextTick(() => { currentStartState = ConnectivityState.READY; - pickFirst.updateAddressList([{ host: 'localhost', port: 3 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 3 }] }], + config + ); }); }); it('Should transition from READY to IDLE if the connected subchannel disconnects', done => { @@ -371,7 +377,10 @@ describe('pick_first load balancing policy', () => { } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); - pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 1 }] }], + config + ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); }); @@ -396,10 +405,16 @@ describe('pick_first load balancing policy', () => { } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); - pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 1 }] }], + config + ); process.nextTick(() => { currentStartState = ConnectivityState.IDLE; - pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 2 }] }], + config + ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); }); @@ -425,10 +440,16 @@ describe('pick_first load balancing policy', () => { } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); - pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 1 }] }], + config + ); process.nextTick(() => { currentStartState = ConnectivityState.TRANSIENT_FAILURE; - pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 2 }] }], + config + ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); }); @@ -454,9 +475,15 @@ describe('pick_first load balancing policy', () => { } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper); - pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 1 }] }], + config + ); process.nextTick(() => { - pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 2 }] }], + config + ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); }); @@ -490,24 +517,24 @@ describe('pick_first load balancing policy', () => { }, } ); - const addresses: SubchannelAddress[] = []; + const endpoints: Endpoint[] = []; for (let i = 0; i < 10; i++) { - addresses.push({ host: 'localhost', port: i + 1 }); + endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] }); } const pickFirst = new PickFirstLoadBalancer(channelControlHelper); /* Pick from 10 subchannels 5 times, with address randomization enabled, * and verify that at least two different subchannels are picked. The * probability choosing the same address every time is 1/10,000, which * I am considering an acceptable flake rate */ - pickFirst.updateAddressList(addresses, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig); process.nextTick(() => { - pickFirst.updateAddressList(addresses, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig); process.nextTick(() => { - pickFirst.updateAddressList(addresses, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig); process.nextTick(() => { - pickFirst.updateAddressList(addresses, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig); process.nextTick(() => { - pickFirst.updateAddressList(addresses, shuffleConfig); + pickFirst.updateAddressList(endpoints, shuffleConfig); process.nextTick(() => { assert(pickedSubchannels.size > 1); done(); @@ -546,20 +573,20 @@ describe('pick_first load balancing policy', () => { }, } ); - const addresses: SubchannelAddress[] = []; + const endpoints: Endpoint[] = []; for (let i = 0; i < 10; i++) { - addresses.push({ host: 'localhost', port: i + 1 }); + endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] }); } const pickFirst = new PickFirstLoadBalancer(channelControlHelper); - pickFirst.updateAddressList(addresses, config); + pickFirst.updateAddressList(endpoints, config); process.nextTick(() => { - pickFirst.updateAddressList(addresses, config); + pickFirst.updateAddressList(endpoints, config); process.nextTick(() => { - pickFirst.updateAddressList(addresses, config); + pickFirst.updateAddressList(endpoints, config); process.nextTick(() => { - pickFirst.updateAddressList(addresses, config); + pickFirst.updateAddressList(endpoints, config); process.nextTick(() => { - pickFirst.updateAddressList(addresses, config); + pickFirst.updateAddressList(endpoints, config); process.nextTick(() => { assert(pickedSubchannels.size === 1); done(); diff --git a/packages/grpc-js/test/test-resolver.ts b/packages/grpc-js/test/test-resolver.ts index 98d74823..c8836728 100644 --- a/packages/grpc-js/test/test-resolver.ts +++ b/packages/grpc-js/test/test-resolver.ts @@ -25,12 +25,27 @@ import * as resolver_ip from '../src/resolver-ip'; import { ServiceConfig } from '../src/service-config'; import { StatusObject } from '../src/call-interface'; import { + Endpoint, SubchannelAddress, - isTcpSubchannelAddress, - subchannelAddressToString, + endpointToString, + subchannelAddressEqual, } from '../src/subchannel-address'; import { parseUri, GrpcUri } from '../src/uri-parser'; +function hasMatchingAddress( + endpointList: Endpoint[], + expectedAddress: SubchannelAddress +): boolean { + for (const endpoint of endpointList) { + for (const address of endpoint.addresses) { + if (subchannelAddressEqual(address, expectedAddress)) { + return true; + } + } + } + return false; +} + describe('Name Resolver', () => { before(() => { resolver_dns.setup(); @@ -46,27 +61,17 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '127.0.0.1' && - addr.port === 50051 - ) + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 }) ); assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 50051 - ) + hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) ); done(); }, @@ -83,28 +88,16 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '127.0.0.1' && - addr.port === 443 - ) - ); - assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 443 - ) + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }) ); + assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); done(); }, onError: (error: StatusObject) => { @@ -118,19 +111,14 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme(parseUri('1.2.3.4')!)!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '1.2.3.4' && - addr.port === 443 - ) + hasMatchingAddress(endpointList, { host: '1.2.3.4', port: 443 }) ); done(); }, @@ -145,20 +133,13 @@ describe('Name Resolver', () => { const target = resolverManager.mapUriDefaultScheme(parseUri('::1')!)!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; - assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 443 - ) - ); + assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); done(); }, onError: (error: StatusObject) => { @@ -174,19 +155,14 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 50051 - ) + hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) ); done(); }, @@ -203,13 +179,13 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; - assert(addressList.length > 0); + assert(endpointList.length > 0); done(); }, onError: (error: StatusObject) => { @@ -227,7 +203,7 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { @@ -253,7 +229,7 @@ describe('Name Resolver', () => { let count = 0; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { @@ -290,21 +266,16 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '127.0.0.1' && - addr.port === 443 - ), - `None of [${addressList.map(addr => - subchannelAddressToString(addr) + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }), + `None of [${endpointList.map(addr => + endpointToString(addr) )}] matched '127.0.0.1:443'` ); done(); @@ -324,20 +295,13 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; - assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 443 - ) - ); + assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); done(); }, onError: (error: StatusObject) => { @@ -356,21 +320,16 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '127.0.0.1' && - addr.port === 443 - ), - `None of [${addressList.map(addr => - subchannelAddressToString(addr) + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }), + `None of [${endpointList.map(addr => + endpointToString(addr) )}] matched '127.0.0.1:443'` ); /* TODO(murgatroid99): check for IPv6 result, once we can get that @@ -392,13 +351,13 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; - assert(addressList.length > 0); + assert(endpointList.length > 0); done(); }, onError: (error: StatusObject) => { @@ -422,11 +381,11 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { - assert(addressList.length > 0); + assert(endpointList.length > 0); completeCount += 1; if (completeCount === 2) { // Only handle the first resolution result @@ -452,25 +411,15 @@ describe('Name Resolver', () => { target, { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '127.0.0.1' && - addr.port === 443 - ) + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }) ); assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 443 - ) + hasMatchingAddress(endpointList, { host: '::1', port: 443 }) ); resultCount += 1; if (resultCount === 1) { @@ -498,7 +447,7 @@ describe('Name Resolver', () => { target, { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { @@ -527,17 +476,13 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; - assert( - addressList.some( - addr => !isTcpSubchannelAddress(addr) && addr.path === 'socket' - ) - ); + assert(hasMatchingAddress(endpointList, { path: 'socket' })); done(); }, onError: (error: StatusObject) => { @@ -553,18 +498,13 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; - assert( - addressList.some( - addr => - !isTcpSubchannelAddress(addr) && addr.path === '/tmp/socket' - ) - ); + assert(hasMatchingAddress(endpointList, { path: '/tmp/socket' })); done(); }, onError: (error: StatusObject) => { @@ -582,19 +522,14 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '127.0.0.1' && - addr.port === 443 - ) + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }) ); done(); }, @@ -611,19 +546,14 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '127.0.0.1' && - addr.port === 50051 - ) + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 }) ); done(); }, @@ -640,27 +570,17 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '127.0.0.1' && - addr.port === 50051 - ) + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 }) ); assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '127.0.0.1' && - addr.port === 50052 - ) + hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50052 }) ); done(); }, @@ -677,20 +597,13 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; - assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 443 - ) - ); + assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 })); done(); }, onError: (error: StatusObject) => { @@ -706,19 +619,14 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 50051 - ) + hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) ); done(); }, @@ -735,27 +643,17 @@ describe('Name Resolver', () => { )!; const listener: resolverManager.ResolverListener = { onSuccessfulResolution: ( - addressList: SubchannelAddress[], + endpointList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null ) => { // Only handle the first resolution result listener.onSuccessfulResolution = () => {}; assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 50051 - ) + hasMatchingAddress(endpointList, { host: '::1', port: 50051 }) ); assert( - addressList.some( - addr => - isTcpSubchannelAddress(addr) && - addr.host === '::1' && - addr.port === 50052 - ) + hasMatchingAddress(endpointList, { host: '::1', port: 50052 }) ); done(); }, From e919aa7aa375385bba9661496e67f9ca243bf032 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 30 Aug 2023 14:47:06 -0700 Subject: [PATCH 2/4] grpc-js-xds: Update LB policies to handle grpc-js changes --- .../grpc-js-xds/interop/xds-interop-client.ts | 6 +-- packages/grpc-js-xds/src/load-balancer-cds.ts | 11 +---- .../grpc-js-xds/src/load-balancer-priority.ts | 44 +++++++++---------- .../src/load-balancer-weighted-target.ts | 30 ++++++------- .../src/load-balancer-xds-cluster-impl.ts | 27 +++++++++--- .../src/load-balancer-xds-cluster-manager.ts | 12 ++--- .../src/load-balancer-xds-cluster-resolver.ts | 40 +++++++++-------- .../src/load-balancer-xds-wrr-locality.ts | 12 ++--- .../test/test-custom-lb-policies.ts | 6 +-- 9 files changed, 99 insertions(+), 89 deletions(-) diff --git a/packages/grpc-js-xds/interop/xds-interop-client.ts b/packages/grpc-js-xds/interop/xds-interop-client.ts index f9034ed8..dc70034f 100644 --- a/packages/grpc-js-xds/interop/xds-interop-client.ts +++ b/packages/grpc-js-xds/interop/xds-interop-client.ts @@ -34,7 +34,7 @@ import TypedLoadBalancingConfig = grpc.experimental.TypedLoadBalancingConfig; import LoadBalancer = grpc.experimental.LoadBalancer; import ChannelControlHelper = grpc.experimental.ChannelControlHelper; import ChildLoadBalancerHandler = grpc.experimental.ChildLoadBalancerHandler; -import SubchannelAddress = grpc.experimental.SubchannelAddress; +import Endpoint = grpc.experimental.Endpoint; import Picker = grpc.experimental.Picker; import PickArgs = grpc.experimental.PickArgs; import PickResult = grpc.experimental.PickResult; @@ -99,12 +99,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer { }); this.child = new ChildLoadBalancerHandler(childChannelControlHelper); } - updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) { return; } this.latestConfig = lbConfig; - this.child.updateAddressList(addressList, RPC_BEHAVIOR_CHILD_CONFIG, attributes); + this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes); } exitIdle(): void { this.child.exitIdle(); diff --git a/packages/grpc-js-xds/src/load-balancer-cds.ts b/packages/grpc-js-xds/src/load-balancer-cds.ts index 647ab2b9..1e4f63b4 100644 --- a/packages/grpc-js-xds/src/load-balancer-cds.ts +++ b/packages/grpc-js-xds/src/load-balancer-cds.ts @@ -18,23 +18,16 @@ import { connectivityState, status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from '@grpc/grpc-js'; import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client'; import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster'; -import SubchannelAddress = experimental.SubchannelAddress; +import Endpoint = experimental.Endpoint; import UnavailablePicker = experimental.UnavailablePicker; import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; import registerLoadBalancerType = experimental.registerLoadBalancerType; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; -import SuccessRateEjectionConfig = experimental.SuccessRateEjectionConfig; -import FailurePercentageEjectionConfig = experimental.FailurePercentageEjectionConfig; import QueuePicker = experimental.QueuePicker; -import OutlierDetectionRawConfig = experimental.OutlierDetectionRawConfig; import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig; -import { OutlierDetection__Output } from './generated/envoy/config/cluster/v3/OutlierDetection'; -import { Duration__Output } from './generated/google/protobuf/Duration'; -import { EXPERIMENTAL_OUTLIER_DETECTION } from './environment'; import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler } from './load-balancer-xds-cluster-resolver'; -import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from './resources'; import { CdsUpdate, ClusterResourceType } from './xds-resource-type/cluster-resource-type'; const TRACER_NAME = 'cds_balancer'; @@ -258,7 +251,7 @@ export class CdsLoadBalancer implements LoadBalancer { } updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown } ): void { diff --git a/packages/grpc-js-xds/src/load-balancer-priority.ts b/packages/grpc-js-xds/src/load-balancer-priority.ts index 4a3e41a1..ba4fd2cf 100644 --- a/packages/grpc-js-xds/src/load-balancer-priority.ts +++ b/packages/grpc-js-xds/src/load-balancer-priority.ts @@ -19,8 +19,8 @@ import { connectivityState as ConnectivityState, status as Status, Metadata, log import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; import registerLoadBalancerType = experimental.registerLoadBalancerType; -import SubchannelAddress = experimental.SubchannelAddress; -import subchannelAddressToString = experimental.subchannelAddressToString; +import Endpoint = experimental.Endpoint; +import endpointToString = experimental.endpointToString; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import Picker = experimental.Picker; import QueuePicker = experimental.QueuePicker; @@ -40,16 +40,16 @@ const TYPE_NAME = 'priority'; const DEFAULT_FAILOVER_TIME_MS = 10_000; const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000; -export type LocalitySubchannelAddress = SubchannelAddress & { +export interface LocalityEndpoint extends Endpoint { localityPath: string[]; locality: Locality__Output; weight: number; }; -export function isLocalitySubchannelAddress( - address: SubchannelAddress -): address is LocalitySubchannelAddress { - return Array.isArray((address as LocalitySubchannelAddress).localityPath); +export function isLocalityEndpoint( + address: Endpoint +): address is LocalityEndpoint { + return Array.isArray((address as LocalityEndpoint).localityPath); } /** @@ -138,7 +138,7 @@ class PriorityLoadBalancingConfig implements TypedLoadBalancingConfig { interface PriorityChildBalancer { updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown } ): void; @@ -154,7 +154,7 @@ interface PriorityChildBalancer { } interface UpdateArgs { - subchannelAddress: SubchannelAddress[]; + subchannelAddress: Endpoint[]; lbConfig: TypedLoadBalancingConfig; ignoreReresolutionRequests: boolean; } @@ -218,11 +218,11 @@ export class PriorityLoadBalancer implements LoadBalancer { } updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown } ): void { - this.childBalancer.updateAddressList(addressList, lbConfig, attributes); + this.childBalancer.updateAddressList(endpointList, lbConfig, attributes); } exitIdle() { @@ -412,7 +412,7 @@ export class PriorityLoadBalancer implements LoadBalancer { } updateAddressList( - addressList: SubchannelAddress[], + endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown } ): void { @@ -425,23 +425,23 @@ export class PriorityLoadBalancer implements LoadBalancer { * which child it belongs to. So we bucket those addresses by that first * element, and pass along the rest of the localityPath for that child * to use. */ - const childAddressMap: Map = new Map< + const childAddressMap: Map = new Map< string, - LocalitySubchannelAddress[] + LocalityEndpoint[] >(); - for (const address of addressList) { - if (!isLocalitySubchannelAddress(address)) { + for (const endpoint of endpointList) { + if (!isLocalityEndpoint(endpoint)) { // Reject address that cannot be prioritized return; } - if (address.localityPath.length < 1) { + if (endpoint.localityPath.length < 1) { // Reject address that cannot be prioritized return; } - const childName = address.localityPath[0]; - const childAddress: LocalitySubchannelAddress = { - ...address, - localityPath: address.localityPath.slice(1), + const childName = endpoint.localityPath[0]; + const childAddress: LocalityEndpoint = { + ...endpoint, + localityPath: endpoint.localityPath.slice(1), }; let childAddressList = childAddressMap.get(childName); if (childAddressList === undefined) { @@ -458,7 +458,7 @@ export class PriorityLoadBalancer implements LoadBalancer { * update all existing children with their new configs */ for (const [childName, childConfig] of lbConfig.getChildren()) { const childAddresses = childAddressMap.get(childName) ?? []; - trace('Assigning child ' + childName + ' address list ' + childAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')')) + trace('Assigning child ' + childName + ' endpoint list ' + childAddresses.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')')) this.latestUpdates.set(childName, { subchannelAddress: childAddresses, lbConfig: childConfig.config, diff --git a/packages/grpc-js-xds/src/load-balancer-weighted-target.ts b/packages/grpc-js-xds/src/load-balancer-weighted-target.ts index 231f3b17..16874e01 100644 --- a/packages/grpc-js-xds/src/load-balancer-weighted-target.ts +++ b/packages/grpc-js-xds/src/load-balancer-weighted-target.ts @@ -16,7 +16,7 @@ */ import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from "@grpc/grpc-js"; -import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority"; +import { isLocalityEndpoint, LocalityEndpoint } from "./load-balancer-priority"; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; @@ -27,8 +27,8 @@ import PickResult = experimental.PickResult; import PickArgs = experimental.PickArgs; import QueuePicker = experimental.QueuePicker; import UnavailablePicker = experimental.UnavailablePicker; -import SubchannelAddress = experimental.SubchannelAddress; -import subchannelAddressToString = experimental.subchannelAddressToString; +import Endpoint = experimental.Endpoint; +import endpointToString = experimental.endpointToString; import selectLbConfigFromList = experimental.selectLbConfigFromList; const TRACER_NAME = 'weighted_target'; @@ -154,7 +154,7 @@ class WeightedTargetPicker implements Picker { } interface WeightedChild { - updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void; + updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void; exitIdle(): void; resetBackoff(): void; destroy(): void; @@ -190,9 +190,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { this.parent.maybeUpdateState(); } - updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void { this.weight = lbConfig.weight; - this.childBalancer.updateAddressList(addressList, lbConfig.child_policy, attributes); + this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, attributes); } exitIdle(): void { this.childBalancer.exitIdle(); @@ -319,7 +319,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { this.channelControlHelper.updateState(connectivityState, picker); } - updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) { // Reject a config of the wrong type trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); @@ -330,9 +330,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { * which child it belongs to. So we bucket those addresses by that first * element, and pass along the rest of the localityPath for that child * to use. */ - const childAddressMap = new Map(); + const childEndpointMap = new Map(); for (const address of addressList) { - if (!isLocalitySubchannelAddress(address)) { + if (!isLocalityEndpoint(address)) { // Reject address that cannot be associated with targets return; } @@ -341,14 +341,14 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { return; } const childName = address.localityPath[0]; - const childAddress: LocalitySubchannelAddress = { + const childAddress: LocalityEndpoint = { ...address, localityPath: address.localityPath.slice(1), }; - let childAddressList = childAddressMap.get(childName); + let childAddressList = childEndpointMap.get(childName); if (childAddressList === undefined) { childAddressList = []; - childAddressMap.set(childName, childAddressList); + childEndpointMap.set(childName, childAddressList); } childAddressList.push(childAddress); } @@ -363,9 +363,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { } else { target.maybeReactivate(); } - const targetAddresses = childAddressMap.get(targetName) ?? []; - trace('Assigning target ' + targetName + ' address list ' + targetAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')')); - target.updateAddressList(targetAddresses, targetConfig, attributes); + const targetEndpoints = childEndpointMap.get(targetName) ?? []; + trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')')); + target.updateAddressList(targetEndpoints, targetConfig, attributes); } // Deactivate targets that are not in the new config diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts index 926c7a69..81e36562 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts @@ -18,11 +18,13 @@ import { experimental, logVerbosity, status as Status, Metadata, connectivityState } from "@grpc/grpc-js"; import { validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap"; import { getSingletonXdsClient, XdsClient, XdsClusterDropStats, XdsClusterLocalityStats } from "./xds-client"; -import { LocalitySubchannelAddress } from "./load-balancer-priority"; +import { LocalityEndpoint } from "./load-balancer-priority"; import LoadBalancer = experimental.LoadBalancer; import registerLoadBalancerType = experimental.registerLoadBalancerType; -import SubchannelAddress = experimental.SubchannelAddress; +import Endpoint = experimental.Endpoint; +import endpointHasAddress = experimental.endpointHasAddress; +import subchannelAddressToString = experimental.subchannelAddressToString; import Picker = experimental.Picker; import PickArgs = experimental.PickArgs; import PickResult = experimental.PickResult; @@ -34,6 +36,7 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import selectLbConfigFromList = experimental.selectLbConfigFromList; import SubchannelInterface = experimental.SubchannelInterface; import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper; +import { Locality__Output } from "./generated/envoy/config/core/v3/Locality"; const TRACER_NAME = 'xds_cluster_impl'; @@ -245,6 +248,7 @@ function getCallCounterMapKey(cluster: string, edsServiceName?: string): string class XdsClusterImplBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; + private lastestEndpointList: Endpoint[] | null = null; private latestConfig: XdsClusterImplLoadBalancingConfig | null = null; private clusterDropStats: XdsClusterDropStats | null = null; private xdsClient: XdsClient | null = null; @@ -252,11 +256,20 @@ class XdsClusterImplBalancer implements LoadBalancer { constructor(private readonly channelControlHelper: ChannelControlHelper) { this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, { createSubchannel: (subchannelAddress, subchannelArgs) => { - if (!this.xdsClient || !this.latestConfig) { + if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList) { throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated'); } - const locality = (subchannelAddress as LocalitySubchannelAddress).locality ?? ''; const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs); + let locality: Locality__Output | null = null; + for (const endpoint of this.lastestEndpointList) { + if (endpointHasAddress(endpoint, subchannelAddress)) { + locality = (endpoint as LocalityEndpoint).locality; + } + } + if (locality === null) { + trace('Not reporting load for address ' + subchannelAddressToString(subchannelAddress) + ' because it has unknown locality.'); + return wrapperChild; + } const lrsServer = this.latestConfig.getLrsLoadReportingServer(); let statsObj: XdsClusterLocalityStats | null = null; if (lrsServer) { @@ -279,15 +292,15 @@ class XdsClusterImplBalancer implements LoadBalancer { } })); } - updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); return; } trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2)); + this.lastestEndpointList = endpointList; this.latestConfig = lbConfig; this.xdsClient = attributes.xdsClient as XdsClient; - if (lbConfig.getLrsLoadReportingServer()) { this.clusterDropStats = this.xdsClient.addClusterDropStats( lbConfig.getLrsLoadReportingServer()!, @@ -296,7 +309,7 @@ class XdsClusterImplBalancer implements LoadBalancer { ); } - this.childBalancer.updateAddressList(addressList, lbConfig.getChildPolicy(), attributes); + this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), attributes); } exitIdle(): void { this.childBalancer.exitIdle(); diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts index ce3207df..a1855d14 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -25,7 +25,7 @@ import PickArgs = experimental.PickArgs; import PickResultType = experimental.PickResultType; import UnavailablePicker = experimental.UnavailablePicker; import QueuePicker = experimental.QueuePicker; -import SubchannelAddress = experimental.SubchannelAddress; +import Endpoint = experimental.Endpoint; import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; import ChannelControlHelper = experimental.ChannelControlHelper; import selectLbConfigFromList = experimental.selectLbConfigFromList; @@ -111,7 +111,7 @@ class XdsClusterManagerPicker implements Picker { } interface XdsClusterManagerChild { - updateAddressList(addressList: SubchannelAddress[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void; + updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void; exitIdle(): void; resetBackoff(): void; destroy(): void; @@ -142,8 +142,8 @@ class XdsClusterManager implements LoadBalancer { this.picker = picker; this.parent.maybeUpdateState(); } - updateAddressList(addressList: SubchannelAddress[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { - this.childBalancer.updateAddressList(addressList, childConfig, attributes); + updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + this.childBalancer.updateAddressList(endpointList, childConfig, attributes); } exitIdle(): void { this.childBalancer.exitIdle(); @@ -235,7 +235,7 @@ class XdsClusterManager implements LoadBalancer { this.channelControlHelper.updateState(connectivityState, picker); } - updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) { // Reject a config of the wrong type trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); @@ -259,7 +259,7 @@ class XdsClusterManager implements LoadBalancer { for (const [name, childConfig] of configChildren.entries()) { if (!this.children.has(name)) { const newChild = new this.XdsClusterManagerChildImpl(this, name); - newChild.updateAddressList(addressList, childConfig, attributes); + newChild.updateAddressList(endpointList, childConfig, attributes); this.children.set(name, newChild); } } diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts index 6c11c52b..752919c9 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-resolver.ts @@ -20,7 +20,7 @@ import { registerLoadBalancerType } from "@grpc/grpc-js/build/src/load-balancer" import { EXPERIMENTAL_OUTLIER_DETECTION } from "./environment"; import { Locality__Output } from "./generated/envoy/config/core/v3/Locality"; import { ClusterLoadAssignment__Output } from "./generated/envoy/config/endpoint/v3/ClusterLoadAssignment"; -import { LocalitySubchannelAddress, PriorityChildRaw } from "./load-balancer-priority"; +import { LocalityEndpoint, PriorityChildRaw } from "./load-balancer-priority"; import { getSingletonXdsClient, Watcher, XdsClient } from "./xds-client"; import { DropCategory } from "./load-balancer-xds-cluster-impl"; @@ -28,11 +28,13 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import LoadBalancer = experimental.LoadBalancer; import Resolver = experimental.Resolver; import SubchannelAddress = experimental.SubchannelAddress; +import Endpoint = experimental.Endpoint; import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; import createResolver = experimental.createResolver; import ChannelControlHelper = experimental.ChannelControlHelper; import OutlierDetectionRawConfig = experimental.OutlierDetectionRawConfig; import subchannelAddressToString = experimental.subchannelAddressToString; +import endpointToString = experimental.endpointToString; import selectLbConfigFromList = experimental.selectLbConfigFromList; import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig; import UnavailablePicker = experimental.UnavailablePicker; @@ -116,7 +118,7 @@ class XdsClusterResolverLoadBalancingConfig implements TypedLoadBalancingConfig interface LocalityEntry { locality: Locality__Output; weight: number; - addresses: SubchannelAddress[]; + endpoints: Endpoint[]; } interface PriorityEntry { @@ -164,18 +166,20 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt if (!endpoint.load_balancing_weight) { continue; } - const addresses: SubchannelAddress[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map( + const endpoints: Endpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map( (lbEndpoint) => { /* The validator in the XdsClient class ensures that each endpoint has * a socket_address with an IP address and a port_value. */ const socketAddress = lbEndpoint.endpoint!.address!.socket_address!; return { - host: socketAddress.address!, - port: socketAddress.port_value!, + addresses: [{ + host: socketAddress.address!, + port: socketAddress.port_value!, + }] }; } ); - if (addresses.length === 0) { + if (endpoints.length === 0) { continue; } let priorityEntry: PriorityEntry; @@ -190,7 +194,7 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt } priorityEntry.localities.push({ locality: endpoint.locality!, - addresses: addresses, + endpoints: endpoints, weight: endpoint.load_balancing_weight.value }); } @@ -198,7 +202,7 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt return result.filter(priority => priority); } -function getDnsPriorities(addresses: SubchannelAddress[]): PriorityEntry[] { +function getDnsPriorities(endpoints: Endpoint[]): PriorityEntry[] { return [{ localities: [{ locality: { @@ -207,7 +211,7 @@ function getDnsPriorities(addresses: SubchannelAddress[]): PriorityEntry[] { sub_zone: '' }, weight: 1, - addresses: addresses + endpoints: endpoints }], dropCategories: [] }]; @@ -249,7 +253,7 @@ export class XdsClusterResolver implements LoadBalancer { } const fullPriorityList: string[] = []; const priorityChildren: {[name: string]: PriorityChildRaw} = {}; - const addressList: LocalitySubchannelAddress[] = []; + const endpointList: LocalityEndpoint[] = []; const edsChildPolicy = this.latestConfig.getXdsLbPolicy(); for (const entry of this.discoveryMechanismList) { const newPriorityNames: string[] = []; @@ -291,15 +295,15 @@ export class XdsClusterResolver implements LoadBalancer { newPriorityNames[priority] = newPriorityName; for (const localityObj of priorityEntry.localities) { - for (const address of localityObj.addresses) { - addressList.push({ + for (const endpoint of localityObj.endpoints) { + endpointList.push({ localityPath: [ newPriorityName, localityToName(localityObj.locality), ], locality: localityObj.locality, weight: localityObj.weight, - ...address, + ...endpoint }); } newLocalityPriorities.set(localityToName(localityObj.locality), priority); @@ -349,16 +353,16 @@ export class XdsClusterResolver implements LoadBalancer { this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `LB policy config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()})); return; } - trace('Child update addresses: ' + addressList.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')')); + trace('Child update addresses: ' + endpointList.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')')); trace('Child update priority config: ' + JSON.stringify(childConfig, undefined, 2)); this.childBalancer.updateAddressList( - addressList, + endpointList, typedChildConfig, this.latestAttributes ); } - updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { if (!(lbConfig instanceof XdsClusterResolverLoadBalancingConfig)) { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2)); return; @@ -399,8 +403,8 @@ export class XdsClusterResolver implements LoadBalancer { } } else { const resolver = createResolver({scheme: 'dns', path: mechanism.dns_hostname!}, { - onSuccessfulResolution: addressList => { - mechanismEntry.latestUpdate = getDnsPriorities(addressList); + onSuccessfulResolution: endpointList => { + mechanismEntry.latestUpdate = getDnsPriorities(endpointList); this.maybeUpdateChild(); }, onError: error => { diff --git a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts index 8636937f..3ef78939 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-wrr-locality.ts @@ -20,13 +20,13 @@ import { LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js"; import { loadProtosWithOptionsSync } from "@grpc/proto-loader/build/src/util"; import { WeightedTargetRaw } from "./load-balancer-weighted-target"; -import { isLocalitySubchannelAddress } from "./load-balancer-priority"; +import { isLocalityEndpoint } from "./load-balancer-priority"; import { localityToName } from "./load-balancer-xds-cluster-resolver"; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; -import SubchannelAddress = experimental.SubchannelAddress; +import Endpoint = experimental.Endpoint; import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig; import registerLoadBalancerType = experimental.registerLoadBalancerType; import { Any__Output } from "./generated/google/protobuf/Any"; @@ -76,14 +76,14 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer { constructor(private readonly channelControlHelper: ChannelControlHelper) { this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); } - updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { if (!(lbConfig instanceof XdsWrrLocalityLoadBalancingConfig)) { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2)); return; } const targets: {[localityName: string]: WeightedTargetRaw} = {}; - for (const address of addressList) { - if (!isLocalitySubchannelAddress(address)) { + for (const address of endpointList) { + if (!isLocalityEndpoint(address)) { return; } const localityName = localityToName(address.locality); @@ -99,7 +99,7 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer { targets: targets } }; - this.childBalancer.updateAddressList(addressList, parseLoadBalancingConfig(childConfig), attributes); + this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), attributes); } exitIdle(): void { this.childBalancer.exitIdle(); diff --git a/packages/grpc-js-xds/test/test-custom-lb-policies.ts b/packages/grpc-js-xds/test/test-custom-lb-policies.ts index 16cca3a8..6994bf39 100644 --- a/packages/grpc-js-xds/test/test-custom-lb-policies.ts +++ b/packages/grpc-js-xds/test/test-custom-lb-policies.ts @@ -30,7 +30,7 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; -import SubchannelAddress = experimental.SubchannelAddress; +import Endpoint = experimental.Endpoint; import Picker = experimental.Picker; import PickArgs = experimental.PickArgs; import PickResult = experimental.PickResult; @@ -94,12 +94,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer { }); this.child = new ChildLoadBalancerHandler(childChannelControlHelper); } - updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) { return; } this.latestConfig = lbConfig; - this.child.updateAddressList(addressList, RPC_BEHAVIOR_CHILD_CONFIG, attributes); + this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes); } exitIdle(): void { this.child.exitIdle(); From 3ff8b674bb6360f686636c33478a0c9ba4feadb7 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 30 Aug 2023 14:57:52 -0700 Subject: [PATCH 3/4] Export HealthListener type in experimental --- packages/grpc-js/src/experimental.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/grpc-js/src/experimental.ts b/packages/grpc-js/src/experimental.ts index 0c7bc75e..870fbe28 100644 --- a/packages/grpc-js/src/experimental.ts +++ b/packages/grpc-js/src/experimental.ts @@ -44,6 +44,7 @@ export { SubchannelInterface, BaseSubchannelWrapper, ConnectivityStateListener, + HealthListener, } from './subchannel-interface'; export { OutlierDetectionRawConfig, From 266af4c19f766ba304213b009b7131b8efd36f4f Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 30 Aug 2023 15:16:25 -0700 Subject: [PATCH 4/4] Add pick_first tests --- packages/grpc-js/test/test-pick-first.ts | 48 ++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/packages/grpc-js/test/test-pick-first.ts b/packages/grpc-js/test/test-pick-first.ts index 7e862de5..8ab4f4d4 100644 --- a/packages/grpc-js/test/test-pick-first.ts +++ b/packages/grpc-js/test/test-pick-first.ts @@ -125,6 +125,54 @@ describe('pick_first load balancing policy', () => { subchannels[0].transitionToState(ConnectivityState.READY); }); }); + it('Should report READY when a subchannel other than the first connects', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.CONNECTING, ConnectivityState.READY], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList( + [ + { addresses: [{ host: 'localhost', port: 1 }] }, + { addresses: [{ host: 'localhost', port: 2 }] }, + ], + config + ); + process.nextTick(() => { + subchannels[1].transitionToState(ConnectivityState.READY); + }); + }); + it('Should report READY when a subchannel other than the first in the same endpoint connects', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.CONNECTING, ConnectivityState.READY], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList( + [ + { + addresses: [ + { host: 'localhost', port: 1 }, + { host: 'localhost', port: 2 }, + ], + }, + ], + config + ); + process.nextTick(() => { + subchannels[1].transitionToState(ConnectivityState.READY); + }); + }); it('Should report READY when updated with a subchannel that is already READY', done => { const channelControlHelper = createChildChannelControlHelper( baseChannelControlHelper,