Merge pull request #912 from cjihrig/readable

grpc-js: handle multiple messages in single 'data' event
This commit is contained in:
Michael Lumish 2019-06-10 13:59:47 -07:00 committed by GitHub
commit 370bfd4039
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 91 additions and 23 deletions

View File

@ -117,6 +117,10 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
}
_read(size: number) {
if (!this.call.consumeUnpushedMessages(this)) {
return;
}
this.call.resume();
}
@ -324,6 +328,10 @@ export class Http2ServerCallStream<
deadline: NodeJS.Timer = noopTimer;
private wantTrailers = false;
private metadataSent = false;
private canPush = false;
private isPushPending = false;
private bufferedMessages: Array<Buffer | null> = [];
private messagesToPush: Array<RequestType | null> = [];
constructor(
private stream: http2.ServerHttp2Stream,
@ -531,38 +539,98 @@ export class Http2ServerCallStream<
) {
const decoder = new StreamDecoder();
/* This code here is wrong but getting the client working is the priority
* right now and I'm not going to block that on fixing what is currently
* unused code. If multiple messages come in with a single frame, this will
* keep calling readable.push after it has returned false, which should not
* happen. Independent of that, deserializeMessage is asynchronous, which
* means that incoming messages could be reordered when emitted to the
* application. That effect will become more pronounced when compression
* support is added and deserializeMessage takes longer by an amount of
* time dependent on the size of the message. A system like the one in
* call-stream.ts should be added to buffer incoming messages and
* preserve ordering more strongly */
this.stream.on('data', async (data: Buffer) => {
const messages = decoder.write(data);
for (const message of messages) {
try {
const deserialized = await this.deserializeMessage(message);
if (!readable.push(deserialized)) {
this.stream.pause();
}
} catch (err) {
err.code = Status.INTERNAL;
readable.emit('error', err);
}
for (const message of messages) {
this.pushOrBufferMessage(readable, message);
}
});
this.stream.once('end', () => {
readable.push(null);
this.pushOrBufferMessage(readable, null);
});
}
consumeUnpushedMessages(
readable:
| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>
): boolean {
this.canPush = true;
while (this.messagesToPush.length > 0) {
const nextMessage = this.messagesToPush.shift();
const canPush = readable.push(nextMessage);
if (nextMessage === null || canPush === false) {
this.canPush = false;
break;
}
}
return this.canPush;
}
private pushOrBufferMessage(
readable:
| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>,
messageBytes: Buffer | null
): void {
if (this.isPushPending) {
this.bufferedMessages.push(messageBytes);
} else {
this.pushMessage(readable, messageBytes);
}
}
private async pushMessage(
readable:
| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>,
messageBytes: Buffer | null
) {
if (messageBytes === null) {
if (this.canPush) {
readable.push(null);
} else {
this.messagesToPush.push(null);
}
return;
}
this.isPushPending = true;
try {
const deserialized = await this.deserializeMessage(messageBytes);
if (this.canPush) {
if (!readable.push(deserialized)) {
this.canPush = false;
this.stream.pause();
}
} else {
this.messagesToPush.push(deserialized);
}
} catch (err) {
// Ignore any remaining messages when errors occur.
this.bufferedMessages.length = 0;
err.code = Status.INTERNAL;
readable.emit('error', err);
}
this.isPushPending = false;
if (this.bufferedMessages.length > 0) {
this.pushMessage(
readable,
this.bufferedMessages.shift() as Buffer | null
);
}
}
}
// tslint:disable:no-any