From 83ece61c88b951931e516e60a7fe10de36cea4b3 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 6 Aug 2025 10:50:05 -0700 Subject: [PATCH] grpc-js: Implement weighted round robin --- packages/grpc-js/src/duration.ts | 15 + packages/grpc-js/src/index.ts | 2 + .../src/load-balancer-weighted-round-robin.ts | 452 ++++++++++++++++++ packages/grpc-js/src/priority-queue.ts | 85 ++++ 4 files changed, 554 insertions(+) create mode 100644 packages/grpc-js/src/load-balancer-weighted-round-robin.ts create mode 100644 packages/grpc-js/src/priority-queue.ts diff --git a/packages/grpc-js/src/duration.ts b/packages/grpc-js/src/duration.ts index 390f29f5..843275f2 100644 --- a/packages/grpc-js/src/duration.ts +++ b/packages/grpc-js/src/duration.ts @@ -58,3 +58,18 @@ export function parseDuration(value: string): Duration | null { nanos: match[2] ? Number.parseInt(match[2].padEnd(9, '0'), 10) : 0 }; } + +export function durationToString(duration: Duration): string { + if (duration.nanos === 0) { + return `${duration.seconds}s`; + } + let scaleFactor: number; + if (duration.nanos % 1_000_000 === 0) { + scaleFactor = 1_000_000; + } else if (duration.nanos % 1_000 === 0) { + scaleFactor = 1_000; + } else { + scaleFactor = 1; + } + return `${duration.seconds}.${duration.nanos/scaleFactor}s`; +} diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index 8cac9111..f26f65a1 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -296,6 +296,7 @@ import * as resolver_ip from './resolver-ip'; import * as load_balancer_pick_first from './load-balancer-pick-first'; import * as load_balancer_round_robin from './load-balancer-round-robin'; import * as load_balancer_outlier_detection from './load-balancer-outlier-detection'; +import * as load_balancer_weighted_round_robin from './load-balancer-weighted-round-robin'; import * as channelz from './channelz'; import { Deadline } from './deadline'; @@ -306,5 +307,6 @@ import { Deadline } from './deadline'; load_balancer_pick_first.setup(); load_balancer_round_robin.setup(); load_balancer_outlier_detection.setup(); + load_balancer_weighted_round_robin.setup(); channelz.setup(); })(); diff --git a/packages/grpc-js/src/load-balancer-weighted-round-robin.ts b/packages/grpc-js/src/load-balancer-weighted-round-robin.ts new file mode 100644 index 00000000..fd6d161c --- /dev/null +++ b/packages/grpc-js/src/load-balancer-weighted-round-robin.ts @@ -0,0 +1,452 @@ +/* + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { StatusOr } from './call-interface'; +import { ChannelOptions } from './channel-options'; +import { ConnectivityState } from './connectivity-state'; +import { LogVerbosity } from './constants'; +import { Duration, durationToMs, durationToString, isDuration, msToDuration, parseDuration } from './duration'; +import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport'; +import { ChannelControlHelper, createChildChannelControlHelper, LoadBalancer, registerLoadBalancerType, TypedLoadBalancingConfig } from './load-balancer'; +import { LeafLoadBalancer } from './load-balancer-pick-first'; +import * as logging from './logging'; +import { createMetricsReader, MetricsListener } from './orca'; +import { PickArgs, Picker, PickResult, QueuePicker, UnavailablePicker } from './picker'; +import { PriorityQueue } from './priority-queue'; +import { Endpoint, endpointToString } from './subchannel-address'; + +const TRACER_NAME = 'weighted_round_robin'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} + +const TYPE_NAME = 'weighted_round_robin'; + +const DEFAULT_OOB_REPORTING_PERIOD_MS = 10_000; +const DEFAULT_BLACKOUT_PERIOD_MS = 10_000; +const DEFAULT_WEIGHT_EXPIRATION_PERIOD_MS = 3 * 60_000; +const DEFAULT_WEIGHT_UPDATE_PERIOD_MS = 1_000; +const DEFAULT_ERROR_UTILIZATION_PENALTY = 1; + +type TypeofValues = + | 'object' + | 'boolean' + | 'function' + | 'number' + | 'string' + | 'undefined'; + +function validateFieldType( + obj: any, + fieldName: string, + expectedType: TypeofValues +) { + if ( + fieldName in obj && + obj[fieldName] !== undefined && + typeof obj[fieldName] !== expectedType + ) { + throw new Error( + `weighted round robin config ${fieldName} parse error: expected ${expectedType}, got ${typeof obj[ + fieldName + ]}` + ); + } +} + +function parseDurationField(obj: any, fieldName: string): number | null { + if (fieldName in obj && obj[fieldName] !== undefined) { + let durationObject: Duration | null; + if (isDuration(obj[fieldName])) { + durationObject = obj[fieldName]; + } else if (typeof obj[fieldName] === 'string') { + durationObject = parseDuration(obj[fieldName]); + } else { + durationObject = null; + } + if (durationObject) { + return durationToMs(durationObject); + } + } + return null; +} + +class WeightedRoundRobinLoadBalancingConfig implements TypedLoadBalancingConfig { + private readonly enableOobLoadReport: boolean; + private readonly oobLoadReportingPeriodMs: number; + private readonly blackoutPeriodMs: number; + private readonly weightExpirationPeriodMs: number; + private readonly weightUpdatePeriodMs: number; + private readonly errorUtilizationPenalty: number; + + constructor( + enableOobLoadReport: boolean | null, + oobLoadReportingPeriodMs: number | null, + blackoutPeriodMs: number | null, + weightExpirationPeriodMs: number | null, + weightUpdatePeriodMs: number | null, + errorUtilizationPenalty: number | null + ) { + this.enableOobLoadReport = enableOobLoadReport ?? false; + this.oobLoadReportingPeriodMs = oobLoadReportingPeriodMs ?? DEFAULT_OOB_REPORTING_PERIOD_MS; + this.blackoutPeriodMs = blackoutPeriodMs ?? DEFAULT_BLACKOUT_PERIOD_MS; + this.weightExpirationPeriodMs = weightExpirationPeriodMs ?? DEFAULT_WEIGHT_EXPIRATION_PERIOD_MS; + this.weightUpdatePeriodMs = Math.min(weightUpdatePeriodMs ?? DEFAULT_WEIGHT_UPDATE_PERIOD_MS, 100); + this.errorUtilizationPenalty = errorUtilizationPenalty ?? DEFAULT_ERROR_UTILIZATION_PENALTY; + } + + getLoadBalancerName(): string { + return TYPE_NAME; + } + toJsonObject(): object { + return { + enable_oob_load_report: this.enableOobLoadReport, + oob_load_reporting_period: durationToString(msToDuration(this.oobLoadReportingPeriodMs)), + blackout_period: durationToString(msToDuration(this.blackoutPeriodMs)), + weight_expiration_period: durationToString(msToDuration(this.weightExpirationPeriodMs)), + weight_update_period: durationToString(msToDuration(this.weightUpdatePeriodMs)), + error_utilization_penalty: this.errorUtilizationPenalty + }; + } + static createFromJson(obj: any): WeightedRoundRobinLoadBalancingConfig { + validateFieldType(obj, 'enable_oob_load_report', 'boolean'); + validateFieldType(obj, 'error_utilization_penalty', 'number'); + if (obj.error_utilization_penalty < 0) { + throw new Error('weighted round robin config error_utilization_penalty < 0'); + } + return new WeightedRoundRobinLoadBalancingConfig( + obj.enable_oob_load_report, + parseDurationField(obj, 'oob_load_reporting_period'), + parseDurationField(obj, 'blackout_period'), + parseDurationField(obj, 'weight_expiration_period'), + parseDurationField(obj, 'weight_update_period'), + obj.error_utilization_penalty + ) + } + + getEnableOobLoadReport() { + return this.enableOobLoadReport; + } + getOobLoadReportingPeriodMs() { + return this.oobLoadReportingPeriodMs; + } + getBlackoutPeriodMs() { + return this.blackoutPeriodMs; + } + getWeightExpirationPeriodMs() { + return this.weightExpirationPeriodMs; + } + getWeightUpdatePeriodMs() { + return this.weightUpdatePeriodMs; + } + getErrorUtilizationPenalty() { + return this.errorUtilizationPenalty; + } +} + +interface WeightedPicker { + endpointName: string; + picker: Picker; + weight: number; +} + +interface QueueEntry { + endpointName: string; + picker: Picker; + period: number; + deadline: number; +} + +type MetricsHandler = (loadReport: OrcaLoadReport__Output, endpointName: string) => void; + +class WeightedRoundRobinPicker implements Picker { + private queue: PriorityQueue = new PriorityQueue((a, b) => a.deadline < b.deadline); + constructor(children: WeightedPicker[], private readonly metricsHandler: MetricsHandler | null) { + for (const child of children) { + const period = 1 / child.weight; + this.queue.push({ + endpointName: child.endpointName, + picker: child.picker, + period: period, + deadline: Math.random() * period + }); + } + } + pick(pickArgs: PickArgs): PickResult { + const entry = this.queue.pop(); + this.queue.push({ + ...entry, + deadline: entry.deadline + entry.period + }) + const childPick = entry.picker.pick(pickArgs); + if (this.metricsHandler) { + return { + ...childPick, + onCallEnded: createMetricsReader(loadReport => this.metricsHandler!(loadReport, entry.endpointName), childPick.onCallEnded) + }; + } else { + return childPick; + } + } +} + +interface ChildEntry { + child: LeafLoadBalancer; + lastUpdated: Date; + nonEmptySince: Date | null; + weight: number; + oobMetricsListener: MetricsListener | null; +} + +class WeightedRoundRobinLoadBalancer implements LoadBalancer { + private latestConfig: WeightedRoundRobinLoadBalancingConfig | null = null; + + private children: Map = new Map(); + + private currentState: ConnectivityState = ConnectivityState.IDLE; + + private updatesPaused = false; + + private lastError: string | null = null; + + constructor(private readonly channelControlHelper: ChannelControlHelper) {} + + private countChildrenWithState(state: ConnectivityState) { + let count = 0; + for (const entry of this.children.values()) { + if (entry.child.getConnectivityState() === state) { + count += 1; + } + } + return count; + } + + updateWeight(entry: ChildEntry, loadReport: OrcaLoadReport__Output): void { + const qps = loadReport.rps_fractional; + let utilization = loadReport.application_utilization; + if (utilization > 0 && qps > 0) { + utilization += (loadReport.eps / qps) * (this.latestConfig?.getErrorUtilizationPenalty() ?? 0); + } + const newWeight = utilization === 0 ? 0 : qps / utilization; + if (newWeight === 0) { + return; + } + const now = new Date(); + if (entry.nonEmptySince === null) { + entry.nonEmptySince = now; + } + entry.lastUpdated = now; + entry.weight = newWeight; + } + + getWeight(entry: ChildEntry): number { + if (!this.latestConfig) { + return 0; + } + const now = new Date().getTime(); + if (now - entry.lastUpdated.getTime() >= this.latestConfig.getWeightExpirationPeriodMs()) { + entry.nonEmptySince = null; + return 0; + } + const blackoutPeriod = this.latestConfig.getBlackoutPeriodMs(); + if (blackoutPeriod > 0 && (entry.nonEmptySince === null || now - entry.nonEmptySince.getTime() < blackoutPeriod)) { + return 0; + } + return entry.weight; + } + + private calculateAndUpdateState() { + if (this.updatesPaused || !this.latestConfig) { + return; + } + if (this.countChildrenWithState(ConnectivityState.READY) > 0) { + const weightedPickers: WeightedPicker[] = []; + for (const [endpoint, entry] of this.children) { + if (entry.child.getConnectivityState() !== ConnectivityState.READY) { + continue; + } + if (entry.weight > 0) { + weightedPickers.push({ + endpointName: endpoint, + picker: entry.child.getPicker(), + weight: this.getWeight(entry) + }); + } + } + let metricsHandler: MetricsHandler | null; + if (this.latestConfig.getEnableOobLoadReport()) { + metricsHandler = (loadReport, endpointName) => { + const childEntry = this.children.get(endpointName); + if (childEntry) { + this.updateWeight(childEntry, loadReport); + } + }; + } else { + metricsHandler = null; + } + this.updateState( + ConnectivityState.READY, + new WeightedRoundRobinPicker( + weightedPickers, + metricsHandler + ), + null + ); + } else if (this.countChildrenWithState(ConnectivityState.CONNECTING) > 0) { + this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this), null); + } else if ( + this.countChildrenWithState(ConnectivityState.TRANSIENT_FAILURE) > 0 + ) { + const errorMessage = `weighted_round_robin: No connection established. Last error: ${this.lastError}`; + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker({ + details: errorMessage, + }), + errorMessage + ); + } else { + this.updateState(ConnectivityState.IDLE, new QueuePicker(this), null); + } + /* round_robin should keep all children connected, this is how we do that. + * We can't do this more efficiently in the individual child's updateState + * callback because that doesn't have a reference to which child the state + * change is associated with. */ + for (const {child} of this.children.values()) { + if (child.getConnectivityState() === ConnectivityState.IDLE) { + child.exitIdle(); + } + } + } + + private updateState(newState: ConnectivityState, picker: Picker, errorMessage: string | null) { + trace( + ConnectivityState[this.currentState] + + ' -> ' + + ConnectivityState[newState] + ); + this.currentState = newState; + this.channelControlHelper.updateState(newState, picker, errorMessage); + } + + updateAddressList(maybeEndpointList: StatusOr, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean { + if (!(lbConfig instanceof WeightedRoundRobinLoadBalancingConfig)) { + return false; + } + if (!maybeEndpointList.ok) { + if (this.children.size === 0) { + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker(maybeEndpointList.error), + maybeEndpointList.error.details + ); + } + return true; + } + const now = new Date(); + const seenEndpointNames = new Set(); + this.updatesPaused = true; + for (const endpoint of maybeEndpointList.value) { + const name = endpointToString(endpoint); + seenEndpointNames.add(name); + let entry = this.children.get(name); + if (entry) { + if (lbConfig.getEnableOobLoadReport()) { + if (!this.latestConfig || !this.latestConfig.getEnableOobLoadReport() || lbConfig.getOobLoadReportingPeriodMs() !== this.latestConfig.getOobLoadReportingPeriodMs()) { + if (!entry.oobMetricsListener) { + entry.oobMetricsListener = loadReport => { + this.updateWeight(entry!, loadReport); + }; + } + entry.child.addMetricsSubscription(entry.oobMetricsListener, lbConfig.getOobLoadReportingPeriodMs()); + } + } else { + if (entry.oobMetricsListener) { + entry.child.removeMetricsSubscription(entry.oobMetricsListener); + entry.oobMetricsListener = null; + } + } + } else { + entry = { + child: new LeafLoadBalancer(endpoint, createChildChannelControlHelper(this.channelControlHelper, { + updateState: (connectivityState, picker, errorMessage) => { + /* Ensure that name resolution is requested again after active + * connections are dropped. This is more aggressive than necessary to + * accomplish that, so we are counting on resolvers to have + * reasonable rate limits. */ + if (this.currentState === ConnectivityState.READY && connectivityState !== ConnectivityState.READY) { + this.channelControlHelper.requestReresolution(); + } + if (connectivityState === ConnectivityState.READY) { + entry!.nonEmptySince = null; + } + if (errorMessage) { + this.lastError = errorMessage; + } + this.calculateAndUpdateState(); + }, + }), options, resolutionNote), + lastUpdated: now, + nonEmptySince: null, + weight: 0, + oobMetricsListener: null + }; + if (lbConfig.getEnableOobLoadReport()) { + entry.oobMetricsListener = loadReport => { + this.updateWeight(entry!, loadReport); + }; + entry.child.addMetricsSubscription(entry.oobMetricsListener, lbConfig.getOobLoadReportingPeriodMs()); + } + } + } + for (const [endpointName, entry] of this.children) { + if (!seenEndpointNames.has(endpointName)) { + entry.child.destroy(); + this.children.delete(endpointName); + } + } + this.updatesPaused = false; + this.calculateAndUpdateState(); + return true; + } + exitIdle(): void { + /* The weighted_round_robin LB policy is only in the IDLE state if it has + * no addresses to try to connect to and it has no picked subchannel. + * In that case, there is no meaningful action that can be taken here. */ + } + resetBackoff(): void { + // This LB policy has no backoff to reset + } + destroy(): void { + for (const entry of this.children.values()) { + entry.child.destroy(); + } + this.children.clear(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType( + TYPE_NAME, + WeightedRoundRobinLoadBalancer, + WeightedRoundRobinLoadBalancingConfig + ); +} diff --git a/packages/grpc-js/src/priority-queue.ts b/packages/grpc-js/src/priority-queue.ts new file mode 100644 index 00000000..6973cf38 --- /dev/null +++ b/packages/grpc-js/src/priority-queue.ts @@ -0,0 +1,85 @@ +/* + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Implementation adapted from https://stackoverflow.com/a/42919752/159388 + +const top = 0; +const parent = (i: number) => Math.floor(i / 2); +const left = (i: number) => i * 2 + 1; +const right = (i: number) => i * 2 + 2; + +export class PriorityQueue { + private readonly heap: T[] = []; + constructor(private readonly comparator = (a: T, b: T) => a > b) {} + + size(): number { + return this.heap.length; + } + isEmpty(): boolean { + return this.size() == 0; + } + peek(): T { + return this.heap[top]; + } + push(...values: T[]) { + values.forEach(value => { + this.heap.push(value); + this.siftUp(); + }); + return this.size(); + } + pop(): T { + const poppedValue = this.peek(); + const bottom = this.size() - 1; + if (bottom > top) { + this.swap(top, bottom); + } + this.heap.pop(); + this.siftDown(); + return poppedValue; + } + replace(value: T): T { + const replacedValue = this.peek(); + this.heap[top] = value; + this.siftDown(); + return replacedValue; + } + private greater(i: number, j: number): boolean { + return this.comparator(this.heap[i], this.heap[j]); + } + private swap(i: number, j: number): void { + [this.heap[i], this.heap[j]] = [this.heap[j], this.heap[i]]; + } + private siftUp(): void { + let node = this.size() - 1; + while (node > top && this.greater(node, parent(node))) { + this.swap(node, parent(node)); + node = parent(node); + } + } + private siftDown(): void { + let node = top; + while ( + (left(node) < this.size() && this.greater(left(node), node)) || + (right(node) < this.size() && this.greater(right(node), node)) + ) { + let maxChild = (right(node) < this.size() && this.greater(right(node), left(node))) ? right(node) : left(node); + this.swap(node, maxChild); + node = maxChild; + } + } +}