grpc-js: Implement ORCA client-side OOB metrics

This commit is contained in:
Michael Lumish 2025-07-30 15:12:40 -07:00
parent 7e82de7770
commit ede914ddbf
6 changed files with 157 additions and 13 deletions

View File

@ -285,6 +285,8 @@ export {
ServerInterceptor,
} from './server-interceptors';
export { ServerMetricRecorder } from './orca';
import * as experimental from './experimental';
export { experimental };

View File

@ -34,7 +34,7 @@ import {
} from './picker';
import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { LogVerbosity, Status } from './constants';
import {
SubchannelInterface,
ConnectivityStateListener,
@ -44,6 +44,12 @@ import { isTcpSubchannelAddress } from './subchannel-address';
import { isIPv6 } from 'net';
import { ChannelOptions } from './channel-options';
import { StatusOr, statusOrFromValue } from './call-interface';
import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport';
import { OpenRcaServiceClient } from './generated/xds/service/orca/v3/OpenRcaService';
import { ClientReadableStream, ServiceError } from './call';
import { createOrcaClient } from './orca';
import { msToDuration } from './duration';
import { BackoffTimeout } from './backoff-timeout';
const TRACER_NAME = 'pick_first';
@ -59,6 +65,8 @@ const TYPE_NAME = 'pick_first';
*/
const CONNECTION_DELAY_INTERVAL_MS = 250;
export type MetricsListener = (loadReport: OrcaLoadReport__Output) => void;
export class PickFirstLoadBalancingConfig implements TypedLoadBalancingConfig {
constructor(private readonly shuffleAddressList: boolean) {}
@ -239,6 +247,13 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private latestResolutionNote: string = '';
private metricsListeners: Map<MetricsListener, number> = new Map();
private orcaClient: OpenRcaServiceClient | null = null;
private metricsCall: ClientReadableStream<OrcaLoadReport__Output> | null = null;
private currentMetricsIntervalMs: number = Infinity;
private orcaUnsupported = false;
private metricsBackoffTimer = new BackoffTimeout(() => this.updateMetricsSubscription());
/**
* Load balancer that attempts to connect to each backend in the address list
* in order, and picks the first one that connects, using it for every
@ -336,6 +351,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentPick.removeHealthStateWatcher(
this.pickedSubchannelHealthListener
);
this.orcaClient?.close();
this.orcaClient = null;
this.metricsCall?.cancel();
this.metricsCall = null;
this.metricsBackoffTimer.stop();
this.metricsBackoffTimer.reset();
// Unref last, to avoid triggering listeners
this.currentPick.unref();
this.currentPick = null;
@ -439,6 +460,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentPick = subchannel;
clearTimeout(this.connectionDelayTimeout);
this.calculateAndReportNewState();
this.updateMetricsSubscription();
}
private updateState(newState: ConnectivityState, picker: Picker, errorMessage: string | null) {
@ -573,6 +595,67 @@ export class PickFirstLoadBalancer implements LoadBalancer {
getTypeName(): string {
return TYPE_NAME;
}
private getOrCreateOrcaClient(): OpenRcaServiceClient | null {
if (this.orcaClient) {
return this.orcaClient;
}
if (this.currentPick) {
const channel = this.currentPick.getChannel();
this.orcaClient = createOrcaClient(channel);
return this.orcaClient;
}
return null;
}
private updateMetricsSubscription() {
if (this.orcaUnsupported) {
return;
}
if (this.metricsListeners.size > 0) {
const newInterval = Math.min(...Array.from(this.metricsListeners.values()));
if (!this.metricsCall || newInterval !== this.currentMetricsIntervalMs) {
const orcaClient = this.getOrCreateOrcaClient();
if (!orcaClient) {
return;
}
this.metricsCall?.cancel();
this.currentMetricsIntervalMs = newInterval;
const metricsCall = orcaClient.streamCoreMetrics({report_interval: msToDuration(newInterval)});
this.metricsCall = metricsCall;
metricsCall.on('data', (report: OrcaLoadReport__Output) => {
this.metricsListeners.forEach((interval, listener) => {
listener(report);
});
});
metricsCall.on('error', (error: ServiceError) => {
this.metricsCall = null;
if (error.code === Status.UNIMPLEMENTED) {
this.orcaUnsupported = true;
return;
}
if (error.code === Status.CANCELLED) {
return;
}
this.metricsBackoffTimer.runOnce();
});
}
} else {
this.metricsCall?.cancel();
this.metricsCall = null;
this.currentMetricsIntervalMs = Infinity;
}
}
addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
this.metricsListeners.set(listener, intervalMs);
this.updateMetricsSubscription();
}
removeMetricsSubscription(listener: MetricsListener): void {
this.metricsListeners.delete(listener);
this.updateMetricsSubscription();
}
}
const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
@ -650,6 +733,14 @@ export class LeafLoadBalancer {
destroy() {
this.pickFirstBalancer.destroy();
}
addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
this.pickFirstBalancer.addMetricsSubscription(listener, intervalMs);
}
removeMetricsSubscription(listener: MetricsListener): void {
this.pickFirstBalancer.removeMetricsSubscription(listener);
}
}
export function setup(): void {

View File

@ -20,9 +20,11 @@ import { OrcaLoadReport } from "./generated/xds/data/orca/v3/OrcaLoadReport";
import type { loadSync } from '@grpc/proto-loader';
import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca";
import { loadPackageDefinition } from "./make-client";
import { OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService";
import { OpenRcaServiceClient, OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService";
import { durationMessageToDuration, durationToMs } from "./duration";
import { Server } from "./server";
import { ChannelCredentials } from "./channel-credentials";
import { Channel } from "./channel";
const loadedOrcaProto: OrcaProtoGrpcType | null = null;
function loadOrcaProto(): OrcaProtoGrpcType {
@ -206,3 +208,8 @@ export class ServerMetricRecorder {
server.addService(serviceDefinition, this.serviceImplementation);
}
}
export function createOrcaClient(channel: Channel): OpenRcaServiceClient {
const ClientClass = loadOrcaProto().xds.service.orca.v3.OpenRcaService;
return new ClientClass('unused', ChannelCredentials.createInsecure(), {channelOverride: channel});
}

View File

@ -24,12 +24,13 @@ import { ChannelOptions } from "./channel-options";
import { ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, unregisterChannelzRef } from "./channelz";
import { ConnectivityState } from "./connectivity-state";
import { Propagate, Status } from "./constants";
import { restrictControlPlaneStatusCode } from "./control-plane-status";
import { Deadline, getRelativeTimeout } from "./deadline";
import { Metadata } from "./metadata";
import { getDefaultAuthority } from "./resolver";
import { Subchannel } from "./subchannel";
import { SubchannelCall } from "./subchannel-call";
import { GrpcUri, uriToString } from "./uri-parser";
import { GrpcUri, splitHostPort, uriToString } from "./uri-parser";
class SubchannelCallWrapper implements Call {
private childCall: SubchannelCall | null = null;
@ -38,7 +39,20 @@ class SubchannelCallWrapper implements Call {
private readPending = false;
private halfClosePending = false;
private pendingStatus: StatusObject | null = null;
private serviceUrl: string;
constructor(private subchannel: Subchannel, private method: string, private options: CallStreamOptions, private callNumber: number) {
const splitPath: string[] = this.method.split('/');
let serviceName = '';
/* The standard path format is "/{serviceName}/{methodName}", so if we split
* by '/', the first item should be empty and the second should be the
* service name */
if (splitPath.length >= 2) {
serviceName = splitPath[1];
}
const hostname = splitHostPort(this.options.host)?.host ?? 'localhost';
/* Currently, call credentials are only allowed on HTTPS connections, so we
* can assume that the scheme is "https" */
this.serviceUrl = `https://${hostname}/${serviceName}`;
const timeout = getRelativeTimeout(options.deadline);
if (timeout !== Infinity) {
if (timeout <= 0) {
@ -79,16 +93,32 @@ class SubchannelCallWrapper implements Call {
});
return;
}
this.childCall = this.subchannel.createCall(metadata, this.options.host, this.method, listener);
if (this.readPending) {
this.childCall.startRead();
}
if (this.pendingMessage) {
this.childCall.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
}
if (this.halfClosePending) {
this.childCall.halfClose();
}
this.subchannel.getCallCredentials()
.generateMetadata({method_name: this.method, service_url: this.serviceUrl})
.then(credsMetadata => {
this.childCall = this.subchannel.createCall(credsMetadata, this.options.host, this.method, listener);
if (this.readPending) {
this.childCall.startRead();
}
if (this.pendingMessage) {
this.childCall.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
}
if (this.halfClosePending) {
this.childCall.halfClose();
}
}, (error: Error & { code: number }) => {
const { code, details } = restrictControlPlaneStatusCode(
typeof error.code === 'number' ? error.code : Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${error.message}`
);
listener.onReceiveStatus(
{
code: code,
details: details,
metadata: new Metadata(),
}
);
});
}
sendMessageWithContext(context: MessageContext, message: Buffer): void {
if (this.childCall) {

View File

@ -16,6 +16,7 @@
*/
import { CallCredentials } from './call-credentials';
import { Channel } from './channel';
import type { SubchannelRef } from './channelz';
import { ConnectivityState } from './connectivity-state';
import { Subchannel } from './subchannel';
@ -67,6 +68,10 @@ export interface SubchannelInterface {
* subchannel.
*/
getCallCredentials(): CallCredentials;
/**
* Get a channel that can be used to make requests with just this
*/
getChannel(): Channel;
}
export abstract class BaseSubchannelWrapper implements SubchannelInterface {
@ -143,4 +148,7 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
getCallCredentials(): CallCredentials {
return this.child.getCallCredentials();
}
getChannel(): Channel {
return this.child.getChannel();
}
}

View File

@ -47,6 +47,8 @@ import { SubchannelCallInterceptingListener } from './subchannel-call';
import { SubchannelCall } from './subchannel-call';
import { CallEventTracker, SubchannelConnector, Transport } from './transport';
import { CallCredentials } from './call-credentials';
import { SingleSubchannelChannel } from './single-subchannel-channel';
import { Channel } from './channel';
const TRACER_NAME = 'subchannel';
@ -519,4 +521,8 @@ export class Subchannel implements SubchannelInterface {
getCallCredentials(): CallCredentials {
return this.secureConnector.getCallCredentials();
}
getChannel(): Channel {
return new SingleSubchannelChannel(this, this.channelTarget, this.options);
}
}