grpc-js: Implement ORCA server-side OOB metrics

This commit is contained in:
Michael Lumish 2025-07-24 15:08:53 -07:00
parent eaa3f4d2bf
commit 78999ac16d
5 changed files with 263 additions and 14 deletions

View File

@ -20,6 +20,18 @@ export interface Duration {
nanos: number;
}
export interface DurationMessage {
seconds: string;
nanos: number;
}
export function durationMessageToDuration(message: DurationMessage): Duration {
return {
seconds: Number.parseInt(message.seconds),
nanos: message.nanos
};
}
export function msToDuration(millis: number): Duration {
return {
seconds: (millis / 1000) | 0,

View File

@ -20,6 +20,9 @@ import { OrcaLoadReport } from "./generated/xds/data/orca/v3/OrcaLoadReport";
import type { loadSync } from '@grpc/proto-loader';
import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca";
import { loadPackageDefinition } from "./make-client";
import { OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService";
import { durationMessageToDuration, durationToMs } from "./duration";
import { Server } from "./server";
const loadedOrcaProto: OrcaProtoGrpcType | null = null;
function loadOrcaProto(): OrcaProtoGrpcType {
@ -47,7 +50,7 @@ function loadOrcaProto(): OrcaProtoGrpcType {
/**
* ORCA metrics recorder for a single request
*/
export class PerRequestMetricsRecorder {
export class PerRequestMetricRecorder {
private message: OrcaLoadReport = {};
/**
@ -131,3 +134,75 @@ export class PerRequestMetricsRecorder {
return orcaProto.xds.data.orca.v3.OrcaLoadReport.serialize(this.message);
}
}
const DEFAULT_REPORT_INTERVAL_MS = 30_000;
export class ServerMetricRecorder {
private message: OrcaLoadReport = {};
private serviceImplementation: OpenRcaServiceHandlers = {
StreamCoreMetrics: call => {
const reportInterval = call.request.report_interval ?
durationToMs(durationMessageToDuration(call.request.report_interval)) :
DEFAULT_REPORT_INTERVAL_MS;
const reportTimer = setInterval(() => {
call.write(this.message);
}, reportInterval);
call.on('cancelled', () => {
clearInterval(reportTimer);
})
}
}
putUtilizationMetric(name: string, value: number) {
if (!this.message.utilization) {
this.message.utilization = {};
}
this.message.utilization[name] = value;
}
setAllUtilizationMetrics(metrics: {[name: string]: number}) {
this.message.utilization = {...metrics};
}
deleteUtilizationMetric(name: string) {
delete this.message.utilization?.[name];
}
setCpuUtilizationMetric(value: number) {
this.message.cpu_utilization = value;
}
deleteCpuUtilizationMetric() {
delete this.message.cpu_utilization;
}
setApplicationUtilizationMetric(value: number) {
this.message.application_utilization = value;
}
deleteApplicationUtilizationMetric() {
delete this.message.application_utilization;
}
setQpsMetric(value: number) {
this.message.rps_fractional = value;
}
deleteQpsMetric() {
delete this.message.rps_fractional;
}
setEpsMetric(value: number) {
this.message.eps = value;
}
deleteEpsMetric() {
delete this.message.eps;
}
addToServer(server: Server) {
const serviceDefinition = loadOrcaProto().xds.service.orca.v3.OpenRcaService.service;
server.addService(serviceDefinition, this.serviceImplementation);
}
}

View File

@ -26,7 +26,7 @@ import type { StatusObject, PartialStatusObject } from './call-interface';
import type { Deadline } from './deadline';
import type { ServerInterceptingCallInterface } from './server-interceptors';
import { AuthContext } from './auth-context';
import { PerRequestMetricsRecorder } from './orca';
import { PerRequestMetricRecorder } from './orca';
export type ServerStatusResponse = Partial<StatusObject>;
@ -41,7 +41,7 @@ export type ServerSurfaceCall = {
getPath(): string;
getHost(): string;
getAuthContext(): AuthContext;
getMetricsRecorder(): PerRequestMetricsRecorder;
getMetricsRecorder(): PerRequestMetricRecorder;
} & EventEmitter;
export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
@ -123,7 +123,7 @@ export class ServerUnaryCallImpl<RequestType, ResponseType>
return this.call.getAuthContext();
}
getMetricsRecorder(): PerRequestMetricsRecorder {
getMetricsRecorder(): PerRequestMetricRecorder {
return this.call.getMetricsRecorder();
}
}
@ -171,7 +171,7 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
return this.call.getAuthContext();
}
getMetricsRecorder(): PerRequestMetricsRecorder {
getMetricsRecorder(): PerRequestMetricRecorder {
return this.call.getMetricsRecorder();
}
}
@ -227,7 +227,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
return this.call.getAuthContext();
}
getMetricsRecorder(): PerRequestMetricsRecorder {
getMetricsRecorder(): PerRequestMetricRecorder {
return this.call.getMetricsRecorder();
}
@ -308,7 +308,7 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType>
return this.call.getAuthContext();
}
getMetricsRecorder(): PerRequestMetricsRecorder {
getMetricsRecorder(): PerRequestMetricRecorder {
return this.call.getMetricsRecorder();
}

View File

@ -35,7 +35,7 @@ import { CallEventTracker } from './transport';
import * as logging from './logging';
import { AuthContext } from './auth-context';
import { TLSSocket } from 'tls';
import { PerRequestMetricsRecorder } from './orca';
import { PerRequestMetricRecorder } from './orca';
const TRACER_NAME = 'server_call';
@ -355,7 +355,7 @@ export interface ServerInterceptingCallInterface {
* the server was constructed with the `grpc.server_call_metric_recording`
* option.
*/
getMetricsRecorder(): PerRequestMetricsRecorder;
getMetricsRecorder(): PerRequestMetricRecorder;
}
export class ServerInterceptingCall implements ServerInterceptingCallInterface {
@ -470,7 +470,7 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface {
getConnectionInfo(): ConnectionInfo {
return this.nextCall.getConnectionInfo();
}
getMetricsRecorder(): PerRequestMetricsRecorder {
getMetricsRecorder(): PerRequestMetricRecorder {
return this.nextCall.getMetricsRecorder();
}
}
@ -545,7 +545,7 @@ export class BaseServerInterceptingCall
private streamEnded = false;
private host: string;
private connectionInfo: ConnectionInfo;
private metricsRecorder = new PerRequestMetricsRecorder();
private metricsRecorder = new PerRequestMetricRecorder();
private shouldSendMetrics: boolean;
constructor(
@ -1036,7 +1036,7 @@ export class BaseServerInterceptingCall
getConnectionInfo(): ConnectionInfo {
return this.connectionInfo;
}
getMetricsRecorder(): PerRequestMetricsRecorder {
getMetricsRecorder(): PerRequestMetricRecorder {
return this.metricsRecorder;
}
}

View File

@ -22,8 +22,11 @@ import * as grpc from '../src';
import { ServiceClient } from '../src/make-client';
import { assert2, loadProtoFile } from './common';
import { ProtoGrpcType as OrcaProtoGrpcType } from "../src/generated/orca";
import { PerRequestMetricsRecorder } from '../src/orca';
import { PerRequestMetricRecorder, ServerMetricRecorder } from '../src/orca';
import { loadSync } from '@grpc/proto-loader';
import { OpenRcaServiceClient } from '../src/generated/xds/service/orca/v3/OpenRcaService';
import { OrcaLoadReport__Output } from '../src/generated/xds/data/orca/v3/OrcaLoadReport';
import { msToDuration } from '../src/duration';
const GRPC_METRICS_HEADER = 'endpoint-load-metrics-bin';
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
@ -44,7 +47,7 @@ const loadedProto = loadSync('xds/service/orca/v3/orca.proto', {
});
const orcaProto = grpc.loadPackageDefinition(loadedProto) as unknown as OrcaProtoGrpcType;
let setMetrics: (metricsRecorder: PerRequestMetricsRecorder) => void = () => {};
let setMetrics: (metricsRecorder: PerRequestMetricRecorder) => void = () => {};
const serviceImpl = {
echo: (
@ -298,4 +301,163 @@ describe('ORCA', () => {
});
});
});
describe('server-side out of band metrics', () => {
let metricRecorder: ServerMetricRecorder;
let server: grpc.Server;
let client: OpenRcaServiceClient;
let call: grpc.ClientReadableStream<OrcaLoadReport__Output> | null = null;
beforeEach(done => {
metricRecorder = new ServerMetricRecorder();
server = new grpc.Server();
metricRecorder.addToServer(server);
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
if (error) {
done(error);
return;
}
client = new orcaProto.xds.service.orca.v3.OpenRcaService(`localhost:${port}`, grpc.credentials.createInsecure());
done();
});
});
afterEach(done => {
call?.cancel();
call = null;
client.close();
server.tryShutdown(done);
});
it('Should send utilization metrics', done => {
metricRecorder.putUtilizationMetric('test', 123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
call.on('data', (report: OrcaLoadReport__Output) => {
assert.strictEqual(report.utilization.test, 123);
done();
});
});
it('Should set all utilization metrics', done => {
metricRecorder.setAllUtilizationMetrics({test1: 123, test2: 456});
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
call.on('data', (report: OrcaLoadReport__Output) => {
assert.deepStrictEqual(report.utilization, {test1: 123, test2: 456});
done();
});
});
it('Should delete utilization metrics', done => {
metricRecorder.putUtilizationMetric('test', 123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
let seenMetric = false;
call.on('data', (report: OrcaLoadReport__Output) => {
if (!seenMetric) {
assert.strictEqual(report.utilization.test, 123);
metricRecorder.deleteUtilizationMetric('test');
seenMetric = true;
} else {
assert.deepStrictEqual(report.utilization, {});
done();
}
});
});
it('Should set CPU utilization', done => {
metricRecorder.setCpuUtilizationMetric(123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
call.on('data', (report: OrcaLoadReport__Output) => {
assert.strictEqual(report.cpu_utilization, 123);
done();
});
});
it('Should delete CPU utilization', done => {
metricRecorder.setCpuUtilizationMetric(123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
let seenMetric = false;
call.on('data', (report: OrcaLoadReport__Output) => {
if (!seenMetric) {
assert.strictEqual(report.cpu_utilization, 123);
metricRecorder.deleteCpuUtilizationMetric();
seenMetric = true;
} else {
assert.strictEqual(report.cpu_utilization, 0);
done();
}
});
});
it('Should set application utilization', done => {
metricRecorder.setApplicationUtilizationMetric(123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
call.on('data', (report: OrcaLoadReport__Output) => {
assert.strictEqual(report.application_utilization, 123);
done();
});
});
it('Should delete application utilization', done => {
metricRecorder.setApplicationUtilizationMetric(123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
let seenMetric = false;
call.on('data', (report: OrcaLoadReport__Output) => {
if (!seenMetric) {
assert.strictEqual(report.application_utilization, 123);
metricRecorder.deleteApplicationUtilizationMetric();
seenMetric = true;
} else {
assert.strictEqual(report.application_utilization, 0);
done();
}
});
});
it('Should set QPS metric', done => {
metricRecorder.setQpsMetric(123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
call.on('data', (report: OrcaLoadReport__Output) => {
assert.strictEqual(report.rps_fractional, 123);
done();
});
});
it('Should delete QPS metric', done => {
metricRecorder.setQpsMetric(123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
let seenMetric = false;
call.on('data', (report: OrcaLoadReport__Output) => {
if (!seenMetric) {
assert.strictEqual(report.rps_fractional, 123);
metricRecorder.deleteQpsMetric();
seenMetric = true;
} else {
assert.strictEqual(report.rps_fractional, 0);
done();
}
});
});
it('Should set EPS metric', done => {
metricRecorder.setEpsMetric(123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
call.on('data', (report: OrcaLoadReport__Output) => {
assert.strictEqual(report.eps, 123);
done();
});
});
it('Should delete QPS metric', done => {
metricRecorder.setEpsMetric(123);
call = client.streamCoreMetrics({report_interval: msToDuration(10)});
call.on('error', () => {});
let seenMetric = false;
call.on('data', (report: OrcaLoadReport__Output) => {
if (!seenMetric) {
assert.strictEqual(report.eps, 123);
metricRecorder.deleteEpsMetric();
seenMetric = true;
} else {
assert.strictEqual(report.eps, 0);
done();
}
});
});
});
});