From c3d7453a84cc2c42b743f92928af44269d4de542 Mon Sep 17 00:00:00 2001 From: cjihrig Date: Sun, 9 Jun 2019 18:12:24 -0700 Subject: [PATCH] grpc-js: handle multiple messages in single 'data' event This commit adds support for receiving multiple messages in a single 'data' event from the underlying HTTP2 stream. It also handles potential out of order messages due to asynchronous deserialization of messages. --- packages/grpc-js/src/server-call.ts | 114 ++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 23 deletions(-) diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 47827d49..23d6b247 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -117,6 +117,10 @@ export class ServerReadableStreamImpl } _read(size: number) { + if (!this.call.consumeUnpushedMessages(this)) { + return; + } + this.call.resume(); } @@ -324,6 +328,10 @@ export class Http2ServerCallStream< deadline: NodeJS.Timer = noopTimer; private wantTrailers = false; private metadataSent = false; + private canPush = false; + private isPushPending = false; + private bufferedMessages: Array = []; + private messagesToPush: Array = []; constructor( private stream: http2.ServerHttp2Stream, @@ -531,38 +539,98 @@ export class Http2ServerCallStream< ) { const decoder = new StreamDecoder(); - /* This code here is wrong but getting the client working is the priority - * right now and I'm not going to block that on fixing what is currently - * unused code. If multiple messages come in with a single frame, this will - * keep calling readable.push after it has returned false, which should not - * happen. Independent of that, deserializeMessage is asynchronous, which - * means that incoming messages could be reordered when emitted to the - * application. That effect will become more pronounced when compression - * support is added and deserializeMessage takes longer by an amount of - * time dependent on the size of the message. A system like the one in - * call-stream.ts should be added to buffer incoming messages and - * preserve ordering more strongly */ - this.stream.on('data', async (data: Buffer) => { const messages = decoder.write(data); - for (const message of messages) { - try { - const deserialized = await this.deserializeMessage(message); - if (!readable.push(deserialized)) { - this.stream.pause(); - } - } catch (err) { - err.code = Status.INTERNAL; - readable.emit('error', err); - } + for (const message of messages) { + this.pushOrBufferMessage(readable, message); } }); this.stream.once('end', () => { - readable.push(null); + this.pushOrBufferMessage(readable, null); }); } + + consumeUnpushedMessages( + readable: + | ServerReadableStream + | ServerDuplexStream + ): boolean { + this.canPush = true; + + while (this.messagesToPush.length > 0) { + const nextMessage = this.messagesToPush.shift(); + const canPush = readable.push(nextMessage); + + if (nextMessage === null || canPush === false) { + this.canPush = false; + break; + } + } + + return this.canPush; + } + + private pushOrBufferMessage( + readable: + | ServerReadableStream + | ServerDuplexStream, + messageBytes: Buffer | null + ): void { + if (this.isPushPending) { + this.bufferedMessages.push(messageBytes); + } else { + this.pushMessage(readable, messageBytes); + } + } + + private async pushMessage( + readable: + | ServerReadableStream + | ServerDuplexStream, + messageBytes: Buffer | null + ) { + if (messageBytes === null) { + if (this.canPush) { + readable.push(null); + } else { + this.messagesToPush.push(null); + } + + return; + } + + this.isPushPending = true; + + try { + const deserialized = await this.deserializeMessage(messageBytes); + + if (this.canPush) { + if (!readable.push(deserialized)) { + this.canPush = false; + this.stream.pause(); + } + } else { + this.messagesToPush.push(deserialized); + } + } catch (err) { + // Ignore any remaining messages when errors occur. + this.bufferedMessages.length = 0; + + err.code = Status.INTERNAL; + readable.emit('error', err); + } + + this.isPushPending = false; + + if (this.bufferedMessages.length > 0) { + this.pushMessage( + readable, + this.bufferedMessages.shift() as Buffer | null + ); + } + } } // tslint:disable:no-any