Merge branch 'master' into grpc-js-xds_wrr

This commit is contained in:
Michael Lumish 2025-08-22 15:15:39 -07:00
commit 1ec5996769
9 changed files with 181 additions and 130 deletions

View File

@ -34,16 +34,16 @@
"devDependencies": {
"@grpc/grpc-js": "file:../grpc-js",
"@grpc/proto-loader": "file:../proto-loader",
"@grpc/reflection": "file:../grpc-reflection",
"@types/gulp": "^4.0.6",
"@types/gulp-mocha": "0.0.32",
"@types/mocha": "^5.2.6",
"@types/node": ">=20.11.20",
"@grpc/reflection": "file:../grpc-reflection",
"@types/yargs": "^15.0.5",
"find-free-ports": "^3.1.1",
"grpc-health-check": "file:../grpc-health-check",
"gts": "^5.0.1",
"ncp": "^2.0.0",
"portfinder": "^1.0.37",
"typescript": "^5.1.3",
"yargs": "^15.4.1"
},

View File

@ -21,11 +21,9 @@ import { ProtoGrpcType } from "./generated/echo";
import { EchoRequest__Output } from "./generated/grpc/testing/EchoRequest";
import { EchoResponse } from "./generated/grpc/testing/EchoResponse";
import * as net from 'net';
import { XdsServer } from "../src";
import { ControlPlaneServer } from "./xds-server";
import { findFreePorts } from 'find-free-ports';
import { XdsServerCredentials } from "../src/xds-credentials";
import { getPortsPromise } from 'portfinder';
const loadedProtos = loadPackageDefinition(loadSync(
[
@ -148,6 +146,6 @@ export class Backend {
}
export async function createBackends(count: number, useXdsServer?: boolean, creds?: ServerCredentials | undefined, serverOptions?: ServerOptions): Promise<Backend[]> {
const ports = await findFreePorts(count);
const ports = await getPortsPromise(count);
return ports.map(port => new Backend(port, useXdsServer ?? true, creds, serverOptions));
}

View File

@ -34,7 +34,7 @@ import {
} from './picker';
import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity, Status } from './constants';
import { LogVerbosity } from './constants';
import {
SubchannelInterface,
ConnectivityStateListener,
@ -44,12 +44,6 @@ import { isTcpSubchannelAddress } from './subchannel-address';
import { isIPv6 } from 'net';
import { ChannelOptions } from './channel-options';
import { StatusOr, statusOrFromValue } from './call-interface';
import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport';
import { OpenRcaServiceClient } from './generated/xds/service/orca/v3/OpenRcaService';
import { ClientReadableStream, ServiceError } from './call';
import { createOrcaClient, MetricsListener } from './orca';
import { msToDuration } from './duration';
import { BackoffTimeout } from './backoff-timeout';
const TRACER_NAME = 'pick_first';
@ -245,13 +239,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private latestResolutionNote: string = '';
private metricsListeners: Map<MetricsListener, number> = new Map();
private orcaClient: OpenRcaServiceClient | null = null;
private metricsCall: ClientReadableStream<OrcaLoadReport__Output> | null = null;
private currentMetricsIntervalMs: number = Infinity;
private orcaUnsupported = false;
private metricsBackoffTimer = new BackoffTimeout(() => this.updateMetricsSubscription());
/**
* Load balancer that attempts to connect to each backend in the address list
* in order, and picks the first one that connects, using it for every
@ -349,12 +336,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentPick.removeHealthStateWatcher(
this.pickedSubchannelHealthListener
);
this.orcaClient?.close();
this.orcaClient = null;
this.metricsCall?.cancel();
this.metricsCall = null;
this.metricsBackoffTimer.stop();
this.metricsBackoffTimer.reset();
// Unref last, to avoid triggering listeners
this.currentPick.unref();
this.currentPick = null;
@ -458,7 +439,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentPick = subchannel;
clearTimeout(this.connectionDelayTimeout);
this.calculateAndReportNewState();
this.updateMetricsSubscription();
}
private updateState(newState: ConnectivityState, picker: Picker, errorMessage: string | null) {
@ -588,77 +568,11 @@ export class PickFirstLoadBalancer implements LoadBalancer {
destroy() {
this.resetSubchannelList();
this.removeCurrentPick();
this.metricsCall?.cancel();
this.metricsCall = null;
this.orcaClient?.close();
this.orcaClient = null;
this.metricsBackoffTimer.stop();
}
getTypeName(): string {
return TYPE_NAME;
}
private getOrCreateOrcaClient(): OpenRcaServiceClient | null {
if (this.orcaClient) {
return this.orcaClient;
}
if (this.currentPick) {
const channel = this.currentPick.getChannel();
this.orcaClient = createOrcaClient(channel);
return this.orcaClient;
}
return null;
}
private updateMetricsSubscription() {
if (this.orcaUnsupported) {
return;
}
if (this.metricsListeners.size > 0) {
const newInterval = Math.min(...Array.from(this.metricsListeners.values()));
if (!this.metricsCall || newInterval !== this.currentMetricsIntervalMs) {
const orcaClient = this.getOrCreateOrcaClient();
if (!orcaClient) {
return;
}
this.metricsCall?.cancel();
this.currentMetricsIntervalMs = newInterval;
const metricsCall = orcaClient.streamCoreMetrics({report_interval: msToDuration(newInterval)});
this.metricsCall = metricsCall;
metricsCall.on('data', (report: OrcaLoadReport__Output) => {
this.metricsListeners.forEach((interval, listener) => {
listener(report);
});
});
metricsCall.on('error', (error: ServiceError) => {
this.metricsCall = null;
if (error.code === Status.UNIMPLEMENTED) {
this.orcaUnsupported = true;
return;
}
if (error.code === Status.CANCELLED) {
return;
}
this.metricsBackoffTimer.runOnce();
});
}
} else {
this.metricsCall?.cancel();
this.metricsCall = null;
this.currentMetricsIntervalMs = Infinity;
}
}
addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
this.metricsListeners.set(listener, intervalMs);
this.updateMetricsSubscription();
}
removeMetricsSubscription(listener: MetricsListener): void {
this.metricsListeners.delete(listener);
this.updateMetricsSubscription();
}
}
const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
@ -736,14 +650,6 @@ export class LeafLoadBalancer {
destroy() {
this.pickFirstBalancer.destroy();
}
addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
this.pickFirstBalancer.addMetricsSubscription(listener, intervalMs);
}
removeMetricsSubscription(listener: MetricsListener): void {
this.pickFirstBalancer.removeMetricsSubscription(listener);
}
}
export function setup(): void {

View File

@ -24,7 +24,7 @@ import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadRep
import { ChannelControlHelper, createChildChannelControlHelper, LoadBalancer, registerLoadBalancerType, TypedLoadBalancingConfig } from './load-balancer';
import { LeafLoadBalancer } from './load-balancer-pick-first';
import * as logging from './logging';
import { createMetricsReader, MetricsListener } from './orca';
import { createMetricsReader, MetricsListener, OrcaOobMetricsSubchannelWrapper } from './orca';
import { PickArgs, Picker, PickResult, QueuePicker, UnavailablePicker } from './picker';
import { PriorityQueue } from './priority-queue';
import { Endpoint, endpointToString } from './subchannel-address';
@ -387,27 +387,12 @@ class WeightedRoundRobinLoadBalancer implements LoadBalancer {
const now = new Date();
const seenEndpointNames = new Set<string>();
this.updatesPaused = true;
this.latestConfig = lbConfig;
for (const endpoint of maybeEndpointList.value) {
const name = endpointToString(endpoint);
seenEndpointNames.add(name);
let entry = this.children.get(name);
if (entry) {
if (lbConfig.getEnableOobLoadReport()) {
if (!this.latestConfig || !this.latestConfig.getEnableOobLoadReport() || lbConfig.getOobLoadReportingPeriodMs() !== this.latestConfig.getOobLoadReportingPeriodMs()) {
if (!entry.oobMetricsListener) {
entry.oobMetricsListener = loadReport => {
this.updateWeight(entry!, loadReport);
};
}
entry.child.addMetricsSubscription(entry.oobMetricsListener, lbConfig.getOobLoadReportingPeriodMs());
}
} else {
if (entry.oobMetricsListener) {
entry.child.removeMetricsSubscription(entry.oobMetricsListener);
entry.oobMetricsListener = null;
}
}
} else {
if (!entry) {
entry = {
child: new LeafLoadBalancer(endpoint, createChildChannelControlHelper(this.channelControlHelper, {
updateState: (connectivityState, picker, errorMessage) => {
@ -426,20 +411,29 @@ class WeightedRoundRobinLoadBalancer implements LoadBalancer {
}
this.calculateAndUpdateState();
},
createSubchannel: (subchannelAddress, subchannelArgs) => {
const subchannel = this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
if (entry?.oobMetricsListener) {
return new OrcaOobMetricsSubchannelWrapper(subchannel, entry.oobMetricsListener, this.latestConfig!.getOobLoadReportingPeriodMs());
} else {
return subchannel;
}
}
}), options, resolutionNote),
lastUpdated: now,
nonEmptySince: null,
weight: 0,
oobMetricsListener: null
};
if (lbConfig.getEnableOobLoadReport()) {
entry.oobMetricsListener = loadReport => {
this.updateWeight(entry!, loadReport);
};
entry.child.addMetricsSubscription(entry.oobMetricsListener, lbConfig.getOobLoadReportingPeriodMs());
}
this.children.set(name, entry);
}
if (lbConfig.getEnableOobLoadReport()) {
entry.oobMetricsListener = loadReport => {
this.updateWeight(entry!, loadReport);
};
} else {
entry.oobMetricsListener = null;
}
}
for (const [endpointName, entry] of this.children) {
if (seenEndpointNames.has(endpointName)) {
@ -449,7 +443,6 @@ class WeightedRoundRobinLoadBalancer implements LoadBalancer {
this.children.delete(endpointName);
}
}
this.latestConfig = lbConfig;
this.updatesPaused = false;
this.calculateAndUpdateState();
if (this.weightUpdateTimer) {

View File

@ -21,11 +21,17 @@ import type { loadSync } from '@grpc/proto-loader';
import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca";
import { loadPackageDefinition } from "./make-client";
import { OpenRcaServiceClient, OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService";
import { durationMessageToDuration, durationToMs } from "./duration";
import { durationMessageToDuration, durationToMs, msToDuration } from "./duration";
import { Server } from "./server";
import { ChannelCredentials } from "./channel-credentials";
import { Channel } from "./channel";
import { OnCallEnded } from "./picker";
import { DataProducer, Subchannel } from "./subchannel";
import { BaseSubchannelWrapper, DataWatcher, SubchannelInterface } from "./subchannel-interface";
import { ClientReadableStream, ServiceError } from "./call";
import { Status } from "./constants";
import { BackoffTimeout } from "./backoff-timeout";
import { ConnectivityState } from "./connectivity-state";
const loadedOrcaProto: OrcaProtoGrpcType | null = null;
function loadOrcaProto(): OrcaProtoGrpcType {
@ -246,3 +252,94 @@ export function createMetricsReader(listener: MetricsListener, previousOnCallEnd
}
}
}
const DATA_PRODUCER_KEY = 'orca_oob_metrics';
class OobMetricsDataWatcher implements DataWatcher {
private dataProducer: DataProducer | null = null;
constructor(private metricsListener: MetricsListener, private intervalMs: number) {}
setSubchannel(subchannel: Subchannel): void {
const producer = subchannel.getOrCreateDataProducer(DATA_PRODUCER_KEY, createOobMetricsDataProducer);
this.dataProducer = producer;
producer.addDataWatcher(this);
}
destroy(): void {
this.dataProducer?.removeDataWatcher(this);
}
getInterval(): number {
return this.intervalMs;
}
onMetricsUpdate(metrics: OrcaLoadReport__Output) {
this.metricsListener(metrics);
}
}
class OobMetricsDataProducer implements DataProducer {
private dataWatchers: Set<OobMetricsDataWatcher> = new Set();
private orcaSupported = true;
private client: OpenRcaServiceClient;
private metricsCall: ClientReadableStream<OrcaLoadReport__Output> | null = null;
private currentInterval = Infinity;
private backoffTimer = new BackoffTimeout(() => this.updateMetricsSubscription());
private subchannelStateListener = () => this.updateMetricsSubscription();
constructor(private subchannel: Subchannel) {
const channel = subchannel.getChannel();
this.client = createOrcaClient(channel);
subchannel.addConnectivityStateListener(this.subchannelStateListener);
}
addDataWatcher(dataWatcher: OobMetricsDataWatcher): void {
this.dataWatchers.add(dataWatcher);
this.updateMetricsSubscription();
}
removeDataWatcher(dataWatcher: OobMetricsDataWatcher): void {
this.dataWatchers.delete(dataWatcher);
if (this.dataWatchers.size === 0) {
this.subchannel.removeDataProducer(DATA_PRODUCER_KEY);
this.metricsCall?.cancel();
this.metricsCall = null;
this.client.close();
this.subchannel.removeConnectivityStateListener(this.subchannelStateListener);
} else {
this.updateMetricsSubscription();
}
}
private updateMetricsSubscription() {
if (this.dataWatchers.size === 0 || !this.orcaSupported || this.subchannel.getConnectivityState() !== ConnectivityState.READY) {
return;
}
const newInterval = Math.min(...Array.from(this.dataWatchers).map(watcher => watcher.getInterval()));
if (!this.metricsCall || newInterval !== this.currentInterval) {
this.metricsCall?.cancel();
this.currentInterval = newInterval;
const metricsCall = this.client.streamCoreMetrics({report_interval: msToDuration(newInterval)});
this.metricsCall = metricsCall;
metricsCall.on('data', (report: OrcaLoadReport__Output) => {
this.dataWatchers.forEach(watcher => {
watcher.onMetricsUpdate(report);
});
});
metricsCall.on('error', (error: ServiceError) => {
this.metricsCall = null;
if (error.code === Status.UNIMPLEMENTED) {
this.orcaSupported = false;
return;
}
if (error.code === Status.CANCELLED) {
return;
}
this.backoffTimer.runOnce();
});
}
}
}
export class OrcaOobMetricsSubchannelWrapper extends BaseSubchannelWrapper {
constructor(child: SubchannelInterface, metricsListener: MetricsListener, intervalMs: number) {
super(child);
this.addDataWatcher(new OobMetricsDataWatcher(metricsListener, intervalMs));
}
}
function createOobMetricsDataProducer(subchannel: Subchannel) {
return new OobMetricsDataProducer(subchannel);
}

View File

@ -31,6 +31,11 @@ export type ConnectivityStateListener = (
export type HealthListener = (healthy: boolean) => void;
export interface DataWatcher {
setSubchannel(subchannel: Subchannel): void;
destroy(): void;
}
/**
* This is an interface for load balancing policies to use to interact with
* subchannels. This allows load balancing policies to wrap and unwrap
@ -53,6 +58,7 @@ export interface SubchannelInterface {
isHealthy(): boolean;
addHealthStateWatcher(listener: HealthListener): void;
removeHealthStateWatcher(listener: HealthListener): void;
addDataWatcher(dataWatcher: DataWatcher): void;
/**
* If this is a wrapper, return the wrapped subchannel, otherwise return this
*/
@ -77,6 +83,8 @@ export interface SubchannelInterface {
export abstract class BaseSubchannelWrapper implements SubchannelInterface {
private healthy = true;
private healthListeners: Set<HealthListener> = new Set();
private refcount = 0;
private dataWatchers: Set<DataWatcher> = new Set();
constructor(protected child: SubchannelInterface) {
child.addHealthStateWatcher(childHealthy => {
/* A change to the child health state only affects this wrapper's overall
@ -113,9 +121,19 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
}
ref(): void {
this.child.ref();
this.refcount += 1;
}
unref(): void {
this.child.unref();
this.refcount -= 1;
if (this.refcount === 0) {
this.destroy();
}
}
protected destroy() {
for (const watcher of this.dataWatchers) {
watcher.destroy();
}
}
getChannelzRef(): SubchannelRef {
return this.child.getChannelzRef();
@ -129,6 +147,10 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
removeHealthStateWatcher(listener: HealthListener): void {
this.healthListeners.delete(listener);
}
addDataWatcher(dataWatcher: DataWatcher): void {
dataWatcher.setSubchannel(this.getRealSubchannel());
this.dataWatchers.add(dataWatcher);
}
protected setHealthy(healthy: boolean): void {
if (healthy !== this.healthy) {
this.healthy = healthy;

View File

@ -41,6 +41,7 @@ import {
} from './channelz';
import {
ConnectivityStateListener,
DataWatcher,
SubchannelInterface,
} from './subchannel-interface';
import { SubchannelCallInterceptingListener } from './subchannel-call';
@ -57,6 +58,11 @@ const TRACER_NAME = 'subchannel';
* to calculate it */
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
export interface DataProducer {
addDataWatcher(dataWatcher: DataWatcher): void;
removeDataWatcher(dataWatcher: DataWatcher): void;
}
export class Subchannel implements SubchannelInterface {
/**
* The subchannel's current connectivity state. Invariant: `session` === `null`
@ -107,6 +113,10 @@ export class Subchannel implements SubchannelInterface {
private secureConnector: SecureConnector;
private dataProducers: Map<string, DataProducer> = new Map();
private subchannelChannel: Channel | null = null;
/**
* A class representing a connection to a single backend.
* @param channelTarget The target string for the channel as a whole
@ -523,6 +533,27 @@ export class Subchannel implements SubchannelInterface {
}
getChannel(): Channel {
return new SingleSubchannelChannel(this, this.channelTarget, this.options);
if (!this.subchannelChannel) {
this.subchannelChannel = new SingleSubchannelChannel(this, this.channelTarget, this.options);
}
return this.subchannelChannel;
}
addDataWatcher(dataWatcher: DataWatcher): void {
throw new Error('Not implemented');
}
getOrCreateDataProducer(name: string, createDataProducer: (subchannel: Subchannel) => DataProducer): DataProducer {
const existingProducer = this.dataProducers.get(name);
if (existingProducer){
return existingProducer;
}
const newProducer = createDataProducer(this);
this.dataProducers.set(name, newProducer);
return newProducer;
}
removeDataProducer(name: string) {
this.dataProducers.delete(name);
}
}

View File

@ -30,6 +30,7 @@ import {
} from '../src/make-client';
import { readFileSync } from 'fs';
import {
DataWatcher,
HealthListener,
SubchannelInterface,
} from '../src/subchannel-interface';
@ -205,6 +206,9 @@ export class MockSubchannel implements SubchannelInterface {
) {
this.state = initialState;
}
addDataWatcher(dataWatcher: DataWatcher): void {
throw new Error('Method not implemented.');
}
getConnectivityState(): grpc.connectivityState {
return this.state;
}

View File

@ -278,7 +278,7 @@ describe('Weighted round robin LB policy', () => {
await makeNCalls(client, 10);
await asyncTimeout(200);
const result = await makeNCalls(client, 40);
assert(Math.abs(result['1'] - 30) < 2, `server1: ${result['1']}, server2: ${result['2']}`);
assert(Math.abs(result['1'] - 30) < 3, `server1: ${result['1']}, server2: ${result['2']}`);
});
// Calls aren't fast enough for this to work consistently
it.skip('Should wait for the blackout period to apply weights', async () => {
@ -430,7 +430,7 @@ describe('Weighted round robin LB policy', () => {
await makeNCalls(client, 10);
await asyncTimeout(200);
const result = await makeNCalls(client, 40);
assert(Math.abs(result['1'] - 30) < 2, `server1: ${result['1']}, server2: ${result['2']}`);
assert(Math.abs(result['1'] - 30) < 3, `server1: ${result['1']}, server2: ${result['2']}`);
});
});
});