From 7cccc392181050f1e7ed7334f148d35dbe6b7b5f Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 9 Dec 2021 16:14:52 -0500 Subject: [PATCH] grpc-js: Preserve order of metadata and messages with async interceptors --- packages/grpc-js/src/call-stream.ts | 35 +++++++++++++--- packages/grpc-js/src/client-interceptors.ts | 44 ++++++++++++++++++--- 2 files changed, 68 insertions(+), 11 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 70a9ac6e..915d812f 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -149,6 +149,9 @@ export function isInterceptingListener( } export class InterceptingListenerImpl implements InterceptingListener { + private processingMetadata = false; + private hasPendingMessage = false; + private pendingMessage: any; private processingMessage = false; private pendingStatus: StatusObject | null = null; constructor( @@ -156,9 +159,27 @@ export class InterceptingListenerImpl implements InterceptingListener { private nextListener: InterceptingListener ) {} + private processPendingMessage() { + if (this.hasPendingMessage) { + this.nextListener.onReceiveMessage(this.pendingMessage); + this.pendingMessage = null; + this.hasPendingMessage = false; + } + } + + private processPendingStatus() { + if (this.pendingStatus) { + this.nextListener.onReceiveStatus(this.pendingStatus); + } + } + onReceiveMetadata(metadata: Metadata): void { + this.processingMetadata = true; this.listener.onReceiveMetadata(metadata, (metadata) => { + this.processingMetadata = false; this.nextListener.onReceiveMetadata(metadata); + this.processPendingMessage(); + this.processPendingStatus(); }); } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -168,15 +189,18 @@ export class InterceptingListenerImpl implements InterceptingListener { this.processingMessage = true; this.listener.onReceiveMessage(message, (msg) => { this.processingMessage = false; - this.nextListener.onReceiveMessage(msg); - if (this.pendingStatus) { - this.nextListener.onReceiveStatus(this.pendingStatus); + if (this.processingMetadata) { + this.pendingMessage = msg; + this.hasPendingMessage = true; + } else { + this.nextListener.onReceiveMessage(msg); + this.processPendingStatus(); } }); } onReceiveStatus(status: StatusObject): void { this.listener.onReceiveStatus(status, (processedStatus) => { - if (this.processingMessage) { + if (this.processingMetadata || this.processingMessage) { this.pendingStatus = processedStatus; } else { this.nextListener.onReceiveStatus(processedStatus); @@ -283,7 +307,7 @@ export class Http2CallStream implements Call { private outputStatus() { /* Precondition: this.finalStatus !== null */ - if (!this.statusOutput) { + if (this.listener && !this.statusOutput) { this.statusOutput = true; const filteredStatus = this.filterStack.receiveTrailers( this.finalStatus! @@ -692,6 +716,7 @@ export class Http2CallStream implements Call { this.trace('Sending metadata'); this.listener = listener; this.channel._startCallStream(this, metadata); + this.maybeOutputStatus(); } private destroyHttp2Stream() { diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts index 5dfbeba1..ddb296ff 100644 --- a/packages/grpc-js/src/client-interceptors.ts +++ b/packages/grpc-js/src/client-interceptors.ts @@ -208,8 +208,18 @@ export class InterceptingCall implements InterceptingCallInterface { */ private requester: FullRequester; /** - * Indicates that a message has been passed to the listener's onReceiveMessage - * method it has not been passed to the corresponding next callback + * Indicates that metadata has been passed to the requester's start + * method but it has not been passed to the corresponding next callback + */ + private processingMetadata = false; + /** + * Message context for a pending message that is waiting for + */ + private pendingMessageContext: MessageContext | null = null; + private pendingMessage: any; + /** + * Indicates that a message has been passed to the requester's sendMessage + * method but it has not been passed to the corresponding next callback */ private processingMessage = false; /** @@ -242,6 +252,21 @@ export class InterceptingCall implements InterceptingCallInterface { getPeer() { return this.nextCall.getPeer(); } + + private processPendingMessage() { + if (this.pendingMessageContext) { + this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage); + this.pendingMessageContext = null; + this.pendingMessage = null; + } + } + + private processPendingHalfClose() { + if (this.pendingHalfClose) { + this.nextCall.halfClose(); + } + } + start( metadata: Metadata, interceptingListener?: Partial @@ -257,7 +282,9 @@ export class InterceptingCall implements InterceptingCallInterface { interceptingListener?.onReceiveStatus?.bind(interceptingListener) ?? ((status) => {}), }; + this.processingMetadata = true; this.requester.start(metadata, fullInterceptingListener, (md, listener) => { + this.processingMetadata = false; let finalInterceptingListener: InterceptingListener; if (isInterceptingListener(listener)) { finalInterceptingListener = listener; @@ -276,6 +303,8 @@ export class InterceptingCall implements InterceptingCallInterface { ); } this.nextCall.start(md, finalInterceptingListener); + this.processPendingMessage(); + this.processPendingHalfClose(); }); } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -283,9 +312,12 @@ export class InterceptingCall implements InterceptingCallInterface { this.processingMessage = true; this.requester.sendMessage(message, (finalMessage) => { this.processingMessage = false; - this.nextCall.sendMessageWithContext(context, finalMessage); - if (this.pendingHalfClose) { - this.nextCall.halfClose(); + if (this.processingMetadata) { + this.pendingMessageContext = context; + this.pendingMessage = message; + } else { + this.nextCall.sendMessageWithContext(context, finalMessage); + this.processPendingHalfClose(); } }); } @@ -298,7 +330,7 @@ export class InterceptingCall implements InterceptingCallInterface { } halfClose(): void { this.requester.halfClose(() => { - if (this.processingMessage) { + if (this.processingMetadata || this.processingMessage) { this.pendingHalfClose = true; } else { this.nextCall.halfClose();