diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index e7001db1..8cac9111 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -285,6 +285,8 @@ export { ServerInterceptor, } from './server-interceptors'; +export { ServerMetricRecorder } from './orca'; + import * as experimental from './experimental'; export { experimental }; diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index f0df79d2..ea4111a0 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -34,7 +34,7 @@ import { } from './picker'; import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address'; import * as logging from './logging'; -import { LogVerbosity } from './constants'; +import { LogVerbosity, Status } from './constants'; import { SubchannelInterface, ConnectivityStateListener, @@ -44,6 +44,12 @@ import { isTcpSubchannelAddress } from './subchannel-address'; import { isIPv6 } from 'net'; import { ChannelOptions } from './channel-options'; import { StatusOr, statusOrFromValue } from './call-interface'; +import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport'; +import { OpenRcaServiceClient } from './generated/xds/service/orca/v3/OpenRcaService'; +import { ClientReadableStream, ServiceError } from './call'; +import { createOrcaClient } from './orca'; +import { msToDuration } from './duration'; +import { BackoffTimeout } from './backoff-timeout'; const TRACER_NAME = 'pick_first'; @@ -59,6 +65,8 @@ const TYPE_NAME = 'pick_first'; */ const CONNECTION_DELAY_INTERVAL_MS = 250; +export type MetricsListener = (loadReport: OrcaLoadReport__Output) => void; + export class PickFirstLoadBalancingConfig implements TypedLoadBalancingConfig { constructor(private readonly shuffleAddressList: boolean) {} @@ -239,6 +247,13 @@ export class PickFirstLoadBalancer implements LoadBalancer { private latestResolutionNote: string = ''; + private metricsListeners: Map = new Map(); + private orcaClient: OpenRcaServiceClient | null = null; + private metricsCall: ClientReadableStream | null = null; + private currentMetricsIntervalMs: number = Infinity; + private orcaUnsupported = false; + private metricsBackoffTimer = new BackoffTimeout(() => this.updateMetricsSubscription()); + /** * Load balancer that attempts to connect to each backend in the address list * in order, and picks the first one that connects, using it for every @@ -336,6 +351,12 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.currentPick.removeHealthStateWatcher( this.pickedSubchannelHealthListener ); + this.orcaClient?.close(); + this.orcaClient = null; + this.metricsCall?.cancel(); + this.metricsCall = null; + this.metricsBackoffTimer.stop(); + this.metricsBackoffTimer.reset(); // Unref last, to avoid triggering listeners this.currentPick.unref(); this.currentPick = null; @@ -439,6 +460,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.currentPick = subchannel; clearTimeout(this.connectionDelayTimeout); this.calculateAndReportNewState(); + this.updateMetricsSubscription(); } private updateState(newState: ConnectivityState, picker: Picker, errorMessage: string | null) { @@ -573,6 +595,67 @@ export class PickFirstLoadBalancer implements LoadBalancer { getTypeName(): string { return TYPE_NAME; } + + private getOrCreateOrcaClient(): OpenRcaServiceClient | null { + if (this.orcaClient) { + return this.orcaClient; + } + if (this.currentPick) { + const channel = this.currentPick.getChannel(); + this.orcaClient = createOrcaClient(channel); + return this.orcaClient; + } + return null; + } + + private updateMetricsSubscription() { + if (this.orcaUnsupported) { + return; + } + if (this.metricsListeners.size > 0) { + const newInterval = Math.min(...Array.from(this.metricsListeners.values())); + if (!this.metricsCall || newInterval !== this.currentMetricsIntervalMs) { + const orcaClient = this.getOrCreateOrcaClient(); + if (!orcaClient) { + return; + } + this.metricsCall?.cancel(); + this.currentMetricsIntervalMs = newInterval; + const metricsCall = orcaClient.streamCoreMetrics({report_interval: msToDuration(newInterval)}); + this.metricsCall = metricsCall; + metricsCall.on('data', (report: OrcaLoadReport__Output) => { + this.metricsListeners.forEach((interval, listener) => { + listener(report); + }); + }); + metricsCall.on('error', (error: ServiceError) => { + this.metricsCall = null; + if (error.code === Status.UNIMPLEMENTED) { + this.orcaUnsupported = true; + return; + } + if (error.code === Status.CANCELLED) { + return; + } + this.metricsBackoffTimer.runOnce(); + }); + } + } else { + this.metricsCall?.cancel(); + this.metricsCall = null; + this.currentMetricsIntervalMs = Infinity; + } + } + + addMetricsSubscription(listener: MetricsListener, intervalMs: number): void { + this.metricsListeners.set(listener, intervalMs); + this.updateMetricsSubscription(); + } + + removeMetricsSubscription(listener: MetricsListener): void { + this.metricsListeners.delete(listener); + this.updateMetricsSubscription(); + } } const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false); @@ -650,6 +733,14 @@ export class LeafLoadBalancer { destroy() { this.pickFirstBalancer.destroy(); } + + addMetricsSubscription(listener: MetricsListener, intervalMs: number): void { + this.pickFirstBalancer.addMetricsSubscription(listener, intervalMs); + } + + removeMetricsSubscription(listener: MetricsListener): void { + this.pickFirstBalancer.removeMetricsSubscription(listener); + } } export function setup(): void { diff --git a/packages/grpc-js/src/orca.ts b/packages/grpc-js/src/orca.ts index 1ffbcd3c..424d301f 100644 --- a/packages/grpc-js/src/orca.ts +++ b/packages/grpc-js/src/orca.ts @@ -20,9 +20,11 @@ import { OrcaLoadReport } from "./generated/xds/data/orca/v3/OrcaLoadReport"; import type { loadSync } from '@grpc/proto-loader'; import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca"; import { loadPackageDefinition } from "./make-client"; -import { OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService"; +import { OpenRcaServiceClient, OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService"; import { durationMessageToDuration, durationToMs } from "./duration"; import { Server } from "./server"; +import { ChannelCredentials } from "./channel-credentials"; +import { Channel } from "./channel"; const loadedOrcaProto: OrcaProtoGrpcType | null = null; function loadOrcaProto(): OrcaProtoGrpcType { @@ -206,3 +208,8 @@ export class ServerMetricRecorder { server.addService(serviceDefinition, this.serviceImplementation); } } + +export function createOrcaClient(channel: Channel): OpenRcaServiceClient { + const ClientClass = loadOrcaProto().xds.service.orca.v3.OpenRcaService; + return new ClientClass('unused', ChannelCredentials.createInsecure(), {channelOverride: channel}); +} diff --git a/packages/grpc-js/src/single-subchannel-channel.ts b/packages/grpc-js/src/single-subchannel-channel.ts index f67f27d6..3b1e8629 100644 --- a/packages/grpc-js/src/single-subchannel-channel.ts +++ b/packages/grpc-js/src/single-subchannel-channel.ts @@ -24,12 +24,13 @@ import { ChannelOptions } from "./channel-options"; import { ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, unregisterChannelzRef } from "./channelz"; import { ConnectivityState } from "./connectivity-state"; import { Propagate, Status } from "./constants"; +import { restrictControlPlaneStatusCode } from "./control-plane-status"; import { Deadline, getRelativeTimeout } from "./deadline"; import { Metadata } from "./metadata"; import { getDefaultAuthority } from "./resolver"; import { Subchannel } from "./subchannel"; import { SubchannelCall } from "./subchannel-call"; -import { GrpcUri, uriToString } from "./uri-parser"; +import { GrpcUri, splitHostPort, uriToString } from "./uri-parser"; class SubchannelCallWrapper implements Call { private childCall: SubchannelCall | null = null; @@ -38,7 +39,20 @@ class SubchannelCallWrapper implements Call { private readPending = false; private halfClosePending = false; private pendingStatus: StatusObject | null = null; + private serviceUrl: string; constructor(private subchannel: Subchannel, private method: string, private options: CallStreamOptions, private callNumber: number) { + const splitPath: string[] = this.method.split('/'); + let serviceName = ''; + /* The standard path format is "/{serviceName}/{methodName}", so if we split + * by '/', the first item should be empty and the second should be the + * service name */ + if (splitPath.length >= 2) { + serviceName = splitPath[1]; + } + const hostname = splitHostPort(this.options.host)?.host ?? 'localhost'; + /* Currently, call credentials are only allowed on HTTPS connections, so we + * can assume that the scheme is "https" */ + this.serviceUrl = `https://${hostname}/${serviceName}`; const timeout = getRelativeTimeout(options.deadline); if (timeout !== Infinity) { if (timeout <= 0) { @@ -79,16 +93,32 @@ class SubchannelCallWrapper implements Call { }); return; } - this.childCall = this.subchannel.createCall(metadata, this.options.host, this.method, listener); - if (this.readPending) { - this.childCall.startRead(); - } - if (this.pendingMessage) { - this.childCall.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message); - } - if (this.halfClosePending) { - this.childCall.halfClose(); - } + this.subchannel.getCallCredentials() + .generateMetadata({method_name: this.method, service_url: this.serviceUrl}) + .then(credsMetadata => { + this.childCall = this.subchannel.createCall(credsMetadata, this.options.host, this.method, listener); + if (this.readPending) { + this.childCall.startRead(); + } + if (this.pendingMessage) { + this.childCall.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message); + } + if (this.halfClosePending) { + this.childCall.halfClose(); + } + }, (error: Error & { code: number }) => { + const { code, details } = restrictControlPlaneStatusCode( + typeof error.code === 'number' ? error.code : Status.UNKNOWN, + `Getting metadata from plugin failed with error: ${error.message}` + ); + listener.onReceiveStatus( + { + code: code, + details: details, + metadata: new Metadata(), + } + ); + }); } sendMessageWithContext(context: MessageContext, message: Buffer): void { if (this.childCall) { diff --git a/packages/grpc-js/src/subchannel-interface.ts b/packages/grpc-js/src/subchannel-interface.ts index ddf37d04..39001c1b 100644 --- a/packages/grpc-js/src/subchannel-interface.ts +++ b/packages/grpc-js/src/subchannel-interface.ts @@ -16,6 +16,7 @@ */ import { CallCredentials } from './call-credentials'; +import { Channel } from './channel'; import type { SubchannelRef } from './channelz'; import { ConnectivityState } from './connectivity-state'; import { Subchannel } from './subchannel'; @@ -67,6 +68,10 @@ export interface SubchannelInterface { * subchannel. */ getCallCredentials(): CallCredentials; + /** + * Get a channel that can be used to make requests with just this + */ + getChannel(): Channel; } export abstract class BaseSubchannelWrapper implements SubchannelInterface { @@ -143,4 +148,7 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface { getCallCredentials(): CallCredentials { return this.child.getCallCredentials(); } + getChannel(): Channel { + return this.child.getChannel(); + } } diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index cdf72861..7dc39efa 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -47,6 +47,8 @@ import { SubchannelCallInterceptingListener } from './subchannel-call'; import { SubchannelCall } from './subchannel-call'; import { CallEventTracker, SubchannelConnector, Transport } from './transport'; import { CallCredentials } from './call-credentials'; +import { SingleSubchannelChannel } from './single-subchannel-channel'; +import { Channel } from './channel'; const TRACER_NAME = 'subchannel'; @@ -519,4 +521,8 @@ export class Subchannel implements SubchannelInterface { getCallCredentials(): CallCredentials { return this.secureConnector.getCallCredentials(); } + + getChannel(): Channel { + return new SingleSubchannelChannel(this, this.channelTarget, this.options); + } }