mirror of https://github.com/grpc/grpc-node.git
grpc-js: handle multiple messages in single 'data' event
This commit adds support for receiving multiple messages in a single 'data' event from the underlying HTTP2 stream. It also handles potential out of order messages due to asynchronous deserialization of messages.
This commit is contained in:
parent
4cf2c67305
commit
c3d7453a84
|
|
@ -117,6 +117,10 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
|
||||||
}
|
}
|
||||||
|
|
||||||
_read(size: number) {
|
_read(size: number) {
|
||||||
|
if (!this.call.consumeUnpushedMessages(this)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this.call.resume();
|
this.call.resume();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -324,6 +328,10 @@ export class Http2ServerCallStream<
|
||||||
deadline: NodeJS.Timer = noopTimer;
|
deadline: NodeJS.Timer = noopTimer;
|
||||||
private wantTrailers = false;
|
private wantTrailers = false;
|
||||||
private metadataSent = false;
|
private metadataSent = false;
|
||||||
|
private canPush = false;
|
||||||
|
private isPushPending = false;
|
||||||
|
private bufferedMessages: Array<Buffer | null> = [];
|
||||||
|
private messagesToPush: Array<RequestType | null> = [];
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private stream: http2.ServerHttp2Stream,
|
private stream: http2.ServerHttp2Stream,
|
||||||
|
|
@ -531,38 +539,98 @@ export class Http2ServerCallStream<
|
||||||
) {
|
) {
|
||||||
const decoder = new StreamDecoder();
|
const decoder = new StreamDecoder();
|
||||||
|
|
||||||
/* This code here is wrong but getting the client working is the priority
|
|
||||||
* right now and I'm not going to block that on fixing what is currently
|
|
||||||
* unused code. If multiple messages come in with a single frame, this will
|
|
||||||
* keep calling readable.push after it has returned false, which should not
|
|
||||||
* happen. Independent of that, deserializeMessage is asynchronous, which
|
|
||||||
* means that incoming messages could be reordered when emitted to the
|
|
||||||
* application. That effect will become more pronounced when compression
|
|
||||||
* support is added and deserializeMessage takes longer by an amount of
|
|
||||||
* time dependent on the size of the message. A system like the one in
|
|
||||||
* call-stream.ts should be added to buffer incoming messages and
|
|
||||||
* preserve ordering more strongly */
|
|
||||||
|
|
||||||
this.stream.on('data', async (data: Buffer) => {
|
this.stream.on('data', async (data: Buffer) => {
|
||||||
const messages = decoder.write(data);
|
const messages = decoder.write(data);
|
||||||
for (const message of messages) {
|
|
||||||
try {
|
|
||||||
const deserialized = await this.deserializeMessage(message);
|
|
||||||
|
|
||||||
if (!readable.push(deserialized)) {
|
for (const message of messages) {
|
||||||
this.stream.pause();
|
this.pushOrBufferMessage(readable, message);
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
err.code = Status.INTERNAL;
|
|
||||||
readable.emit('error', err);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
this.stream.once('end', () => {
|
this.stream.once('end', () => {
|
||||||
readable.push(null);
|
this.pushOrBufferMessage(readable, null);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumeUnpushedMessages(
|
||||||
|
readable:
|
||||||
|
| ServerReadableStream<RequestType, ResponseType>
|
||||||
|
| ServerDuplexStream<RequestType, ResponseType>
|
||||||
|
): boolean {
|
||||||
|
this.canPush = true;
|
||||||
|
|
||||||
|
while (this.messagesToPush.length > 0) {
|
||||||
|
const nextMessage = this.messagesToPush.shift();
|
||||||
|
const canPush = readable.push(nextMessage);
|
||||||
|
|
||||||
|
if (nextMessage === null || canPush === false) {
|
||||||
|
this.canPush = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.canPush;
|
||||||
|
}
|
||||||
|
|
||||||
|
private pushOrBufferMessage(
|
||||||
|
readable:
|
||||||
|
| ServerReadableStream<RequestType, ResponseType>
|
||||||
|
| ServerDuplexStream<RequestType, ResponseType>,
|
||||||
|
messageBytes: Buffer | null
|
||||||
|
): void {
|
||||||
|
if (this.isPushPending) {
|
||||||
|
this.bufferedMessages.push(messageBytes);
|
||||||
|
} else {
|
||||||
|
this.pushMessage(readable, messageBytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async pushMessage(
|
||||||
|
readable:
|
||||||
|
| ServerReadableStream<RequestType, ResponseType>
|
||||||
|
| ServerDuplexStream<RequestType, ResponseType>,
|
||||||
|
messageBytes: Buffer | null
|
||||||
|
) {
|
||||||
|
if (messageBytes === null) {
|
||||||
|
if (this.canPush) {
|
||||||
|
readable.push(null);
|
||||||
|
} else {
|
||||||
|
this.messagesToPush.push(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.isPushPending = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const deserialized = await this.deserializeMessage(messageBytes);
|
||||||
|
|
||||||
|
if (this.canPush) {
|
||||||
|
if (!readable.push(deserialized)) {
|
||||||
|
this.canPush = false;
|
||||||
|
this.stream.pause();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.messagesToPush.push(deserialized);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
// Ignore any remaining messages when errors occur.
|
||||||
|
this.bufferedMessages.length = 0;
|
||||||
|
|
||||||
|
err.code = Status.INTERNAL;
|
||||||
|
readable.emit('error', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.isPushPending = false;
|
||||||
|
|
||||||
|
if (this.bufferedMessages.length > 0) {
|
||||||
|
this.pushMessage(
|
||||||
|
readable,
|
||||||
|
this.bufferedMessages.shift() as Buffer | null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// tslint:disable:no-any
|
// tslint:disable:no-any
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue