diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts index d00c02ca..cd552d31 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts @@ -179,8 +179,8 @@ class XdsClusterImplPicker implements Picker { pickSubchannel?.getStatsObject()?.addCallStarted(); callCounterMap.startCall(this.callCounterMapKey); }, - onCallEnded: status => { - originalPick.onCallEnded?.(status); + onCallEnded: (status, details, metadata) => { + originalPick.onCallEnded?.(status, details, metadata); pickSubchannel?.getStatsObject()?.addCallFinished(status !== Status.OK) callCounterMap.endCall(this.callCounterMapKey); } diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index 9a0840da..4fa4b42f 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -428,13 +428,13 @@ class OutlierDetectionPicker implements Picker { if (mapEntry) { let onCallEnded = wrappedPick.onCallEnded; if (this.countCalls) { - onCallEnded = statusCode => { + onCallEnded = (statusCode, details, metadata) => { if (statusCode === Status.OK) { mapEntry.counter.addSuccess(); } else { mapEntry.counter.addFailure(); } - wrappedPick.onCallEnded?.(statusCode); + wrappedPick.onCallEnded?.(statusCode, details, metadata); }; } return { diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index ea4111a0..5eaf31c2 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -47,7 +47,7 @@ import { StatusOr, statusOrFromValue } from './call-interface'; import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport'; import { OpenRcaServiceClient } from './generated/xds/service/orca/v3/OpenRcaService'; import { ClientReadableStream, ServiceError } from './call'; -import { createOrcaClient } from './orca'; +import { createOrcaClient, MetricsListener } from './orca'; import { msToDuration } from './duration'; import { BackoffTimeout } from './backoff-timeout'; @@ -65,8 +65,6 @@ const TYPE_NAME = 'pick_first'; */ const CONNECTION_DELAY_INTERVAL_MS = 250; -export type MetricsListener = (loadReport: OrcaLoadReport__Output) => void; - export class PickFirstLoadBalancingConfig implements TypedLoadBalancingConfig { constructor(private readonly shuffleAddressList: boolean) {} diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts index b3764e02..3ff72898 100644 --- a/packages/grpc-js/src/load-balancing-call.ts +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -29,7 +29,7 @@ import { LogVerbosity, Status } from './constants'; import { Deadline, formatDateDifference, getDeadlineTimeoutString } from './deadline'; import { InternalChannel } from './internal-channel'; import { Metadata } from './metadata'; -import { PickResultType } from './picker'; +import { OnCallEnded, PickResultType } from './picker'; import { CallConfig } from './resolver'; import { splitHostPort } from './uri-parser'; import * as logging from './logging'; @@ -60,7 +60,7 @@ export class LoadBalancingCall implements Call, DeadlineInfoProvider { private serviceUrl: string; private metadata: Metadata | null = null; private listener: InterceptingListener | null = null; - private onCallEnded: ((statusCode: Status) => void) | null = null; + private onCallEnded: OnCallEnded | null = null; private startTime: Date; private childStartTime: Date | null = null; constructor( @@ -127,7 +127,7 @@ export class LoadBalancingCall implements Call, DeadlineInfoProvider { ); const finalStatus = { ...status, progress }; this.listener?.onReceiveStatus(finalStatus); - this.onCallEnded?.(finalStatus.code); + this.onCallEnded?.(finalStatus.code, finalStatus.details, finalStatus.metadata); } } diff --git a/packages/grpc-js/src/metadata.ts b/packages/grpc-js/src/metadata.ts index d59b6cc2..7ae68ba3 100644 --- a/packages/grpc-js/src/metadata.ts +++ b/packages/grpc-js/src/metadata.ts @@ -89,6 +89,7 @@ export interface MetadataOptions { export class Metadata { protected internalRepr: MetadataObject = new Map(); private options: MetadataOptions; + private opaqueData: Map = new Map(); constructor(options: MetadataOptions = {}) { this.options = options; @@ -245,6 +246,27 @@ export class Metadata { return result; } + /** + * Attach additional data of any type to the metadata object, which will not + * be included when sending headers. The data can later be retrieved with + * `getOpaque`. Keys with the prefix `grpc` are reserved for use by this + * library. + * @param key + * @param value + */ + setOpaque(key: string, value: unknown) { + this.opaqueData.set(key, value); + } + + /** + * Retrieve data previously added with `setOpaque`. + * @param key + * @returns + */ + getOpaque(key: string) { + return this.opaqueData.get(key); + } + /** * Returns a new Metadata object based fields in a given IncomingHttpHeaders * object. diff --git a/packages/grpc-js/src/orca.ts b/packages/grpc-js/src/orca.ts index 424d301f..b7515d4f 100644 --- a/packages/grpc-js/src/orca.ts +++ b/packages/grpc-js/src/orca.ts @@ -15,7 +15,7 @@ * */ -import { OrcaLoadReport } from "./generated/xds/data/orca/v3/OrcaLoadReport"; +import { OrcaLoadReport, OrcaLoadReport__Output } from "./generated/xds/data/orca/v3/OrcaLoadReport"; import type { loadSync } from '@grpc/proto-loader'; import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca"; @@ -25,6 +25,7 @@ import { durationMessageToDuration, durationToMs } from "./duration"; import { Server } from "./server"; import { ChannelCredentials } from "./channel-credentials"; import { Channel } from "./channel"; +import { OnCallEnded } from "./picker"; const loadedOrcaProto: OrcaProtoGrpcType | null = null; function loadOrcaProto(): OrcaProtoGrpcType { @@ -213,3 +214,35 @@ export function createOrcaClient(channel: Channel): OpenRcaServiceClient { const ClientClass = loadOrcaProto().xds.service.orca.v3.OpenRcaService; return new ClientClass('unused', ChannelCredentials.createInsecure(), {channelOverride: channel}); } + +export type MetricsListener = (loadReport: OrcaLoadReport__Output) => void; + +export const GRPC_METRICS_HEADER = 'endpoint-load-metrics-bin'; +const PARSED_LOAD_REPORT_KEY = 'grpc_orca_load_report'; + +/** + * Create an onCallEnded callback for use in a picker. + * @param listener The listener to handle metrics, whenever they are provided. + * @param previousOnCallEnded The previous onCallEnded callback to propagate + * to, if applicable. + * @returns + */ +export function createMetricsReader(listener: MetricsListener, previousOnCallEnded: OnCallEnded | null): OnCallEnded { + return (code, details, metadata) => { + let parsedLoadReport = metadata.getOpaque(PARSED_LOAD_REPORT_KEY) as (OrcaLoadReport__Output | undefined); + if (parsedLoadReport) { + listener(parsedLoadReport); + } else { + const serializedLoadReport = metadata.get(GRPC_METRICS_HEADER); + if (serializedLoadReport.length > 0) { + const orcaProto = loadOrcaProto(); + parsedLoadReport = orcaProto.xds.data.orca.v3.OrcaLoadReport.deserialize(serializedLoadReport[0] as Buffer); + listener(parsedLoadReport); + metadata.setOpaque(PARSED_LOAD_REPORT_KEY, parsedLoadReport); + } + } + if (previousOnCallEnded) { + previousOnCallEnded(code, details, metadata); + } + } +} diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index ac79c9fe..fdf42fcf 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -28,6 +28,8 @@ export enum PickResultType { DROP, } +export type OnCallEnded = (statusCode: Status, details: string, metadata: Metadata) => void; + export interface PickResult { pickResultType: PickResultType; /** @@ -42,7 +44,7 @@ export interface PickResult { */ status: StatusObject | null; onCallStarted: (() => void) | null; - onCallEnded: ((statusCode: Status) => void) | null; + onCallEnded: OnCallEnded | null; } export interface CompletePickResult extends PickResult { @@ -50,7 +52,7 @@ export interface CompletePickResult extends PickResult { subchannel: SubchannelInterface | null; status: null; onCallStarted: (() => void) | null; - onCallEnded: ((statusCode: Status) => void) | null; + onCallEnded: OnCallEnded | null; } export interface QueuePickResult extends PickResult { diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index a38957f1..a7cddd93 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -35,7 +35,7 @@ import { CallEventTracker } from './transport'; import * as logging from './logging'; import { AuthContext } from './auth-context'; import { TLSSocket } from 'tls'; -import { PerRequestMetricRecorder } from './orca'; +import { GRPC_METRICS_HEADER, PerRequestMetricRecorder } from './orca'; const TRACER_NAME = 'server_call'; @@ -491,7 +491,6 @@ const GRPC_ENCODING_HEADER = 'grpc-encoding'; const GRPC_MESSAGE_HEADER = 'grpc-message'; const GRPC_STATUS_HEADER = 'grpc-status'; const GRPC_TIMEOUT_HEADER = 'grpc-timeout'; -const GRPC_METRICS_HEADER = 'endpoint-load-metrics-bin'; const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/; const deadlineUnitsToMs: DeadlineUnitIndexSignature = { H: 3600000,