From c4c321d37dfca78a5b97f433652147628fa43186 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 17 Oct 2022 11:32:22 -0700 Subject: [PATCH] grpc-js: Handle filters in ResolvingCall instead of LoadBalancingCall --- packages/grpc-js/src/internal-channel.ts | 2 +- packages/grpc-js/src/load-balancing-call.ts | 59 +++---------- packages/grpc-js/src/resolving-call.ts | 96 +++++++++++++++------ 3 files changed, 83 insertions(+), 74 deletions(-) diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 2a775e7d..16f4b983 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -402,7 +402,7 @@ export class InternalChannel { method + '"' ); - return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, this.filterStackFactory, callNumber); + return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber); } createInnerCall( diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts index 73c1fa9f..6faa1559 100644 --- a/packages/grpc-js/src/load-balancing-call.ts +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -41,14 +41,11 @@ export interface StatusObjectWithProgress extends StatusObject { export class LoadBalancingCall implements Call { private child: SubchannelCall | null = null; private readPending = false; - private writeFilterPending = false; private pendingMessage: {context: MessageContext, message: Buffer} | null = null; private pendingHalfClose = false; - private readFilterPending = false; private pendingChildStatus: StatusObject | null = null; private ended = false; private serviceUrl: string; - private filterStack: FilterStack; private metadata: Metadata | null = null; private listener: InterceptingListener | null = null; private onCallEnded: ((statusCode: Status) => void) | null = null; @@ -59,11 +56,8 @@ export class LoadBalancingCall implements Call { private readonly host : string, private readonly credentials: CallCredentials, private readonly deadline: Deadline, - filterStackFactory: FilterStackFactory, private readonly callNumber: number ) { - this.filterStack = filterStackFactory.createFilter(); - const splitPath: string[] = this.methodName.split('/'); let serviceName = ''; /* The standard path format is "/{serviceName}/{methodName}", so if we split @@ -90,8 +84,7 @@ export class LoadBalancingCall implements Call { if (!this.ended) { this.ended = true; this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"'); - const filteredStatus = this.filterStack.receiveTrailers(status); - const finalStatus = {...filteredStatus, progress}; + const finalStatus = {...status, progress}; this.listener?.onReceiveStatus(finalStatus); this.onCallEnded?.(finalStatus.code); } @@ -152,26 +145,13 @@ export class LoadBalancingCall implements Call { try { this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, { onReceiveMetadata: metadata => { - this.listener!.onReceiveMetadata(this.filterStack.receiveMetadata(metadata)); + this.listener!.onReceiveMetadata(metadata); }, onReceiveMessage: message => { - this.readFilterPending = true; - this.filterStack.receiveMessage(message).then(filteredMesssage => { - this.readFilterPending = false; - this.listener!.onReceiveMessage(filteredMesssage); - if (this.pendingChildStatus) { - this.outputStatus(this.pendingChildStatus, 'PROCESSED'); - } - }, (status: StatusObject) => { - this.cancelWithStatus(status.code, status.details); - }); + this.listener!.onReceiveMessage(message); }, onReceiveStatus: status => { - if (this.readFilterPending) { - this.pendingChildStatus = status; - } else { - this.outputStatus(status, 'PROCESSED'); - } + this.outputStatus(status, 'PROCESSED'); } }); } catch (error) { @@ -201,7 +181,7 @@ export class LoadBalancingCall implements Call { if (this.pendingMessage) { this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message); } - if (this.pendingHalfClose && !this.writeFilterPending) { + if (this.pendingHalfClose) { this.child.halfClose(); } }, (error: Error & { code: number }) => { @@ -249,29 +229,16 @@ export class LoadBalancingCall implements Call { start(metadata: Metadata, listener: InterceptingListener): void { this.trace('start called'); this.listener = listener; - this.filterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => { - this.metadata = filteredMetadata; - this.doPick(); - }, (status: StatusObject) => { - this.outputStatus(status, 'PROCESSED'); - }); + this.metadata = metadata; + this.doPick(); } sendMessageWithContext(context: MessageContext, message: Buffer): void { this.trace('write() called with message of length ' + message.length); - this.writeFilterPending = true; - this.filterStack.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => { - this.writeFilterPending = false; - if (this.child) { - this.child.sendMessageWithContext(context, filteredMessage.message); - if (this.pendingHalfClose) { - this.child.halfClose(); - } - } else { - this.pendingMessage = {context, message: filteredMessage.message}; - } - }, (status: StatusObject) => { - this.cancelWithStatus(status.code, status.details); - }) + if (this.child) { + this.child.sendMessageWithContext(context, message); + } else { + this.pendingMessage = {context, message}; + } } startRead(): void { this.trace('startRead called'); @@ -283,7 +250,7 @@ export class LoadBalancingCall implements Call { } halfClose(): void { this.trace('halfClose called'); - if (this.child && !this.writeFilterPending) { + if (this.child) { this.child.halfClose(); } else { this.pendingHalfClose = true; diff --git a/packages/grpc-js/src/resolving-call.ts b/packages/grpc-js/src/resolving-call.ts index 76a7bd20..f2e5741f 100644 --- a/packages/grpc-js/src/resolving-call.ts +++ b/packages/grpc-js/src/resolving-call.ts @@ -19,7 +19,7 @@ import { CallCredentials } from "./call-credentials"; import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface"; import { LogVerbosity, Propagate, Status } from "./constants"; import { Deadline, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline"; -import { FilterStackFactory } from "./filter-stack"; +import { FilterStack, FilterStackFactory } from "./filter-stack"; import { InternalChannel } from "./internal-channel"; import { Metadata } from "./metadata"; import * as logging from './logging'; @@ -33,12 +33,16 @@ export class ResolvingCall implements Call { private pendingMessage: {context: MessageContext, message: Buffer} | null = null; private pendingHalfClose = false; private ended = false; + private readFilterPending = false; + private writeFilterPending = false; + private pendingChildStatus: StatusObject | null = null; private metadata: Metadata | null = null; private listener: InterceptingListener | null = null; private deadline: Deadline; private host: string; private statusWatchers: ((status: StatusObject) => void)[] = []; private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0); + private filterStack: FilterStack | null = null; constructor( private readonly channel: InternalChannel, @@ -96,14 +100,35 @@ export class ResolvingCall implements Call { private outputStatus(status: StatusObject) { if (!this.ended) { this.ended = true; - this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"'); - this.statusWatchers.forEach(watcher => watcher(status)); + if (!this.filterStack) { + this.filterStack = this.filterStackFactory.createFilter(); + } + const filteredStatus = this.filterStack.receiveTrailers(status); + this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"'); + this.statusWatchers.forEach(watcher => watcher(filteredStatus)); process.nextTick(() => { - this.listener?.onReceiveStatus(status); + this.listener?.onReceiveStatus(filteredStatus); }); } } + private sendMessageOnChild(context: MessageContext, message: Buffer): void { + if (!this.child) { + throw new Error('sendMessageonChild called with child not populated'); + } + const child = this.child; + this.writeFilterPending = true; + this.filterStack!.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => { + this.writeFilterPending = false; + child.sendMessageWithContext(context, filteredMessage.message); + if (this.pendingHalfClose) { + child.halfClose(); + } + }, (status: StatusObject) => { + this.cancelWithStatus(status.code, status.details); + }); + } + getConfig(): void { if (this.ended) { return; @@ -148,29 +173,46 @@ export class ResolvingCall implements Call { } this.filterStackFactory.push(config.dynamicFilterFactories); - - this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline); - this.child.start(this.metadata, { - onReceiveMetadata: metadata => { - this.listener!.onReceiveMetadata(metadata); - }, - onReceiveMessage: message => { - this.listener!.onReceiveMessage(message); - }, - onReceiveStatus: status => { - this.outputStatus(status); + this.filterStack = this.filterStackFactory.createFilter(); + this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => { + this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline); + this.child.start(filteredMetadata, { + onReceiveMetadata: metadata => { + this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata)); + }, + onReceiveMessage: message => { + this.readFilterPending = true; + this.filterStack!.receiveMessage(message).then(filteredMesssage => { + this.readFilterPending = false; + this.listener!.onReceiveMessage(filteredMesssage); + if (this.pendingChildStatus) { + this.outputStatus(this.pendingChildStatus); + } + }, (status: StatusObject) => { + this.cancelWithStatus(status.code, status.details); + }); + }, + onReceiveStatus: status => { + if (this.readFilterPending) { + this.pendingChildStatus = status; + } else { + this.outputStatus(status); + } + } + }); + if (this.readPending) { + this.child.startRead(); } - }); - if (this.readPending) { - this.child.startRead(); - } - if (this.pendingMessage) { - this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message); - } - if (this.pendingHalfClose) { - this.child.halfClose(); - } + if (this.pendingMessage) { + this.sendMessageOnChild(this.pendingMessage.context, this.pendingMessage.message); + } else if (this.pendingHalfClose) { + this.child.halfClose(); + } + }, (status: StatusObject) => { + this.outputStatus(status); + }) } + reportResolverError(status: StatusObject) { if (this.metadata?.getOptions().waitForReady) { this.channel.queueCallForConfig(this); @@ -195,7 +237,7 @@ export class ResolvingCall implements Call { sendMessageWithContext(context: MessageContext, message: Buffer): void { this.trace('write() called with message of length ' + message.length); if (this.child) { - this.child.sendMessageWithContext(context, message); + this.sendMessageOnChild(context, message); } else { this.pendingMessage = {context, message}; } @@ -210,7 +252,7 @@ export class ResolvingCall implements Call { } halfClose(): void { this.trace('halfClose called'); - if (this.child) { + if (this.child && !this.writeFilterPending) { this.child.halfClose(); } else { this.pendingHalfClose = true;