grpc-js: Preserve order of metadata and messages with async interceptors

This commit is contained in:
Michael Lumish 2021-12-09 16:14:52 -05:00
parent db39ba245c
commit 7cccc39218
2 changed files with 68 additions and 11 deletions

View File

@ -149,6 +149,9 @@ export function isInterceptingListener(
} }
export class InterceptingListenerImpl implements InterceptingListener { export class InterceptingListenerImpl implements InterceptingListener {
private processingMetadata = false;
private hasPendingMessage = false;
private pendingMessage: any;
private processingMessage = false; private processingMessage = false;
private pendingStatus: StatusObject | null = null; private pendingStatus: StatusObject | null = null;
constructor( constructor(
@ -156,9 +159,27 @@ export class InterceptingListenerImpl implements InterceptingListener {
private nextListener: InterceptingListener private nextListener: InterceptingListener
) {} ) {}
private processPendingMessage() {
if (this.hasPendingMessage) {
this.nextListener.onReceiveMessage(this.pendingMessage);
this.pendingMessage = null;
this.hasPendingMessage = false;
}
}
private processPendingStatus() {
if (this.pendingStatus) {
this.nextListener.onReceiveStatus(this.pendingStatus);
}
}
onReceiveMetadata(metadata: Metadata): void { onReceiveMetadata(metadata: Metadata): void {
this.processingMetadata = true;
this.listener.onReceiveMetadata(metadata, (metadata) => { this.listener.onReceiveMetadata(metadata, (metadata) => {
this.processingMetadata = false;
this.nextListener.onReceiveMetadata(metadata); this.nextListener.onReceiveMetadata(metadata);
this.processPendingMessage();
this.processPendingStatus();
}); });
} }
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
@ -168,15 +189,18 @@ export class InterceptingListenerImpl implements InterceptingListener {
this.processingMessage = true; this.processingMessage = true;
this.listener.onReceiveMessage(message, (msg) => { this.listener.onReceiveMessage(message, (msg) => {
this.processingMessage = false; this.processingMessage = false;
if (this.processingMetadata) {
this.pendingMessage = msg;
this.hasPendingMessage = true;
} else {
this.nextListener.onReceiveMessage(msg); this.nextListener.onReceiveMessage(msg);
if (this.pendingStatus) { this.processPendingStatus();
this.nextListener.onReceiveStatus(this.pendingStatus);
} }
}); });
} }
onReceiveStatus(status: StatusObject): void { onReceiveStatus(status: StatusObject): void {
this.listener.onReceiveStatus(status, (processedStatus) => { this.listener.onReceiveStatus(status, (processedStatus) => {
if (this.processingMessage) { if (this.processingMetadata || this.processingMessage) {
this.pendingStatus = processedStatus; this.pendingStatus = processedStatus;
} else { } else {
this.nextListener.onReceiveStatus(processedStatus); this.nextListener.onReceiveStatus(processedStatus);
@ -283,7 +307,7 @@ export class Http2CallStream implements Call {
private outputStatus() { private outputStatus() {
/* Precondition: this.finalStatus !== null */ /* Precondition: this.finalStatus !== null */
if (!this.statusOutput) { if (this.listener && !this.statusOutput) {
this.statusOutput = true; this.statusOutput = true;
const filteredStatus = this.filterStack.receiveTrailers( const filteredStatus = this.filterStack.receiveTrailers(
this.finalStatus! this.finalStatus!
@ -692,6 +716,7 @@ export class Http2CallStream implements Call {
this.trace('Sending metadata'); this.trace('Sending metadata');
this.listener = listener; this.listener = listener;
this.channel._startCallStream(this, metadata); this.channel._startCallStream(this, metadata);
this.maybeOutputStatus();
} }
private destroyHttp2Stream() { private destroyHttp2Stream() {

View File

@ -208,8 +208,18 @@ export class InterceptingCall implements InterceptingCallInterface {
*/ */
private requester: FullRequester; private requester: FullRequester;
/** /**
* Indicates that a message has been passed to the listener's onReceiveMessage * Indicates that metadata has been passed to the requester's start
* method it has not been passed to the corresponding next callback * method but it has not been passed to the corresponding next callback
*/
private processingMetadata = false;
/**
* Message context for a pending message that is waiting for
*/
private pendingMessageContext: MessageContext | null = null;
private pendingMessage: any;
/**
* Indicates that a message has been passed to the requester's sendMessage
* method but it has not been passed to the corresponding next callback
*/ */
private processingMessage = false; private processingMessage = false;
/** /**
@ -242,6 +252,21 @@ export class InterceptingCall implements InterceptingCallInterface {
getPeer() { getPeer() {
return this.nextCall.getPeer(); return this.nextCall.getPeer();
} }
private processPendingMessage() {
if (this.pendingMessageContext) {
this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage);
this.pendingMessageContext = null;
this.pendingMessage = null;
}
}
private processPendingHalfClose() {
if (this.pendingHalfClose) {
this.nextCall.halfClose();
}
}
start( start(
metadata: Metadata, metadata: Metadata,
interceptingListener?: Partial<InterceptingListener> interceptingListener?: Partial<InterceptingListener>
@ -257,7 +282,9 @@ export class InterceptingCall implements InterceptingCallInterface {
interceptingListener?.onReceiveStatus?.bind(interceptingListener) ?? interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
((status) => {}), ((status) => {}),
}; };
this.processingMetadata = true;
this.requester.start(metadata, fullInterceptingListener, (md, listener) => { this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
this.processingMetadata = false;
let finalInterceptingListener: InterceptingListener; let finalInterceptingListener: InterceptingListener;
if (isInterceptingListener(listener)) { if (isInterceptingListener(listener)) {
finalInterceptingListener = listener; finalInterceptingListener = listener;
@ -276,6 +303,8 @@ export class InterceptingCall implements InterceptingCallInterface {
); );
} }
this.nextCall.start(md, finalInterceptingListener); this.nextCall.start(md, finalInterceptingListener);
this.processPendingMessage();
this.processPendingHalfClose();
}); });
} }
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
@ -283,9 +312,12 @@ export class InterceptingCall implements InterceptingCallInterface {
this.processingMessage = true; this.processingMessage = true;
this.requester.sendMessage(message, (finalMessage) => { this.requester.sendMessage(message, (finalMessage) => {
this.processingMessage = false; this.processingMessage = false;
if (this.processingMetadata) {
this.pendingMessageContext = context;
this.pendingMessage = message;
} else {
this.nextCall.sendMessageWithContext(context, finalMessage); this.nextCall.sendMessageWithContext(context, finalMessage);
if (this.pendingHalfClose) { this.processPendingHalfClose();
this.nextCall.halfClose();
} }
}); });
} }
@ -298,7 +330,7 @@ export class InterceptingCall implements InterceptingCallInterface {
} }
halfClose(): void { halfClose(): void {
this.requester.halfClose(() => { this.requester.halfClose(() => {
if (this.processingMessage) { if (this.processingMetadata || this.processingMessage) {
this.pendingHalfClose = true; this.pendingHalfClose = true;
} else { } else {
this.nextCall.halfClose(); this.nextCall.halfClose();