Merge pull request #2986 from murgatroid99/grpc-js_client_inline_metrics

grpc-js: Implement ORCA client-side per-call metrics
This commit is contained in:
Michael Lumish 2025-08-01 14:52:05 -07:00 committed by GitHub
commit 4799bdebed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 69 additions and 15 deletions

View File

@ -179,8 +179,8 @@ class XdsClusterImplPicker implements Picker {
pickSubchannel?.getStatsObject()?.addCallStarted();
callCounterMap.startCall(this.callCounterMapKey);
},
onCallEnded: status => {
originalPick.onCallEnded?.(status);
onCallEnded: (status, details, metadata) => {
originalPick.onCallEnded?.(status, details, metadata);
pickSubchannel?.getStatsObject()?.addCallFinished(status !== Status.OK)
callCounterMap.endCall(this.callCounterMapKey);
}

View File

@ -428,13 +428,13 @@ class OutlierDetectionPicker implements Picker {
if (mapEntry) {
let onCallEnded = wrappedPick.onCallEnded;
if (this.countCalls) {
onCallEnded = statusCode => {
onCallEnded = (statusCode, details, metadata) => {
if (statusCode === Status.OK) {
mapEntry.counter.addSuccess();
} else {
mapEntry.counter.addFailure();
}
wrappedPick.onCallEnded?.(statusCode);
wrappedPick.onCallEnded?.(statusCode, details, metadata);
};
}
return {

View File

@ -47,7 +47,7 @@ import { StatusOr, statusOrFromValue } from './call-interface';
import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport';
import { OpenRcaServiceClient } from './generated/xds/service/orca/v3/OpenRcaService';
import { ClientReadableStream, ServiceError } from './call';
import { createOrcaClient } from './orca';
import { createOrcaClient, MetricsListener } from './orca';
import { msToDuration } from './duration';
import { BackoffTimeout } from './backoff-timeout';
@ -65,8 +65,6 @@ const TYPE_NAME = 'pick_first';
*/
const CONNECTION_DELAY_INTERVAL_MS = 250;
export type MetricsListener = (loadReport: OrcaLoadReport__Output) => void;
export class PickFirstLoadBalancingConfig implements TypedLoadBalancingConfig {
constructor(private readonly shuffleAddressList: boolean) {}

View File

@ -29,7 +29,7 @@ import { LogVerbosity, Status } from './constants';
import { Deadline, formatDateDifference, getDeadlineTimeoutString } from './deadline';
import { InternalChannel } from './internal-channel';
import { Metadata } from './metadata';
import { PickResultType } from './picker';
import { OnCallEnded, PickResultType } from './picker';
import { CallConfig } from './resolver';
import { splitHostPort } from './uri-parser';
import * as logging from './logging';
@ -60,7 +60,7 @@ export class LoadBalancingCall implements Call, DeadlineInfoProvider {
private serviceUrl: string;
private metadata: Metadata | null = null;
private listener: InterceptingListener | null = null;
private onCallEnded: ((statusCode: Status) => void) | null = null;
private onCallEnded: OnCallEnded | null = null;
private startTime: Date;
private childStartTime: Date | null = null;
constructor(
@ -127,7 +127,7 @@ export class LoadBalancingCall implements Call, DeadlineInfoProvider {
);
const finalStatus = { ...status, progress };
this.listener?.onReceiveStatus(finalStatus);
this.onCallEnded?.(finalStatus.code);
this.onCallEnded?.(finalStatus.code, finalStatus.details, finalStatus.metadata);
}
}

View File

@ -89,6 +89,7 @@ export interface MetadataOptions {
export class Metadata {
protected internalRepr: MetadataObject = new Map<string, MetadataValue[]>();
private options: MetadataOptions;
private opaqueData: Map<string, unknown> = new Map();
constructor(options: MetadataOptions = {}) {
this.options = options;
@ -245,6 +246,27 @@ export class Metadata {
return result;
}
/**
* Attach additional data of any type to the metadata object, which will not
* be included when sending headers. The data can later be retrieved with
* `getOpaque`. Keys with the prefix `grpc` are reserved for use by this
* library.
* @param key
* @param value
*/
setOpaque(key: string, value: unknown) {
this.opaqueData.set(key, value);
}
/**
* Retrieve data previously added with `setOpaque`.
* @param key
* @returns
*/
getOpaque(key: string) {
return this.opaqueData.get(key);
}
/**
* Returns a new Metadata object based fields in a given IncomingHttpHeaders
* object.

View File

@ -15,7 +15,7 @@
*
*/
import { OrcaLoadReport } from "./generated/xds/data/orca/v3/OrcaLoadReport";
import { OrcaLoadReport, OrcaLoadReport__Output } from "./generated/xds/data/orca/v3/OrcaLoadReport";
import type { loadSync } from '@grpc/proto-loader';
import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca";
@ -25,6 +25,7 @@ import { durationMessageToDuration, durationToMs } from "./duration";
import { Server } from "./server";
import { ChannelCredentials } from "./channel-credentials";
import { Channel } from "./channel";
import { OnCallEnded } from "./picker";
const loadedOrcaProto: OrcaProtoGrpcType | null = null;
function loadOrcaProto(): OrcaProtoGrpcType {
@ -213,3 +214,35 @@ export function createOrcaClient(channel: Channel): OpenRcaServiceClient {
const ClientClass = loadOrcaProto().xds.service.orca.v3.OpenRcaService;
return new ClientClass('unused', ChannelCredentials.createInsecure(), {channelOverride: channel});
}
export type MetricsListener = (loadReport: OrcaLoadReport__Output) => void;
export const GRPC_METRICS_HEADER = 'endpoint-load-metrics-bin';
const PARSED_LOAD_REPORT_KEY = 'grpc_orca_load_report';
/**
* Create an onCallEnded callback for use in a picker.
* @param listener The listener to handle metrics, whenever they are provided.
* @param previousOnCallEnded The previous onCallEnded callback to propagate
* to, if applicable.
* @returns
*/
export function createMetricsReader(listener: MetricsListener, previousOnCallEnded: OnCallEnded | null): OnCallEnded {
return (code, details, metadata) => {
let parsedLoadReport = metadata.getOpaque(PARSED_LOAD_REPORT_KEY) as (OrcaLoadReport__Output | undefined);
if (parsedLoadReport) {
listener(parsedLoadReport);
} else {
const serializedLoadReport = metadata.get(GRPC_METRICS_HEADER);
if (serializedLoadReport.length > 0) {
const orcaProto = loadOrcaProto();
parsedLoadReport = orcaProto.xds.data.orca.v3.OrcaLoadReport.deserialize(serializedLoadReport[0] as Buffer);
listener(parsedLoadReport);
metadata.setOpaque(PARSED_LOAD_REPORT_KEY, parsedLoadReport);
}
}
if (previousOnCallEnded) {
previousOnCallEnded(code, details, metadata);
}
}
}

View File

@ -28,6 +28,8 @@ export enum PickResultType {
DROP,
}
export type OnCallEnded = (statusCode: Status, details: string, metadata: Metadata) => void;
export interface PickResult {
pickResultType: PickResultType;
/**
@ -42,7 +44,7 @@ export interface PickResult {
*/
status: StatusObject | null;
onCallStarted: (() => void) | null;
onCallEnded: ((statusCode: Status) => void) | null;
onCallEnded: OnCallEnded | null;
}
export interface CompletePickResult extends PickResult {
@ -50,7 +52,7 @@ export interface CompletePickResult extends PickResult {
subchannel: SubchannelInterface | null;
status: null;
onCallStarted: (() => void) | null;
onCallEnded: ((statusCode: Status) => void) | null;
onCallEnded: OnCallEnded | null;
}
export interface QueuePickResult extends PickResult {

View File

@ -35,7 +35,7 @@ import { CallEventTracker } from './transport';
import * as logging from './logging';
import { AuthContext } from './auth-context';
import { TLSSocket } from 'tls';
import { PerRequestMetricRecorder } from './orca';
import { GRPC_METRICS_HEADER, PerRequestMetricRecorder } from './orca';
const TRACER_NAME = 'server_call';
@ -491,7 +491,6 @@ const GRPC_ENCODING_HEADER = 'grpc-encoding';
const GRPC_MESSAGE_HEADER = 'grpc-message';
const GRPC_STATUS_HEADER = 'grpc-status';
const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
const GRPC_METRICS_HEADER = 'endpoint-load-metrics-bin';
const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
H: 3600000,