Merge pull request #2193 from murgatroid99/grpc-js_stream_flow_after_end

grpc-js: Drain incoming http2 data after outputting status
This commit is contained in:
Michael Lumish 2022-08-15 13:15:51 -07:00 committed by GitHub
commit a5429ad883
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 4 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@grpc/grpc-js", "name": "@grpc/grpc-js",
"version": "1.6.9", "version": "1.6.10",
"description": "gRPC Library for Node - pure JS implementation", "description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/", "homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -329,6 +329,11 @@ export class Http2CallStream implements Call {
process.nextTick(() => { process.nextTick(() => {
this.listener?.onReceiveStatus(filteredStatus); this.listener?.onReceiveStatus(filteredStatus);
}); });
/* Leave the http2 stream in flowing state to drain incoming messages, to
* ensure that the stream closure completes. The call stream already does
* not push more messages after the status is output, so the messages go
* nowhere either way. */
this.http2Stream?.resume();
if (this.subchannel) { if (this.subchannel) {
this.subchannel.callUnref(); this.subchannel.callUnref();
this.subchannel.removeDisconnectListener(this.disconnectListener); this.subchannel.removeDisconnectListener(this.disconnectListener);
@ -577,6 +582,11 @@ export class Http2CallStream implements Call {
this.handleTrailers(headers); this.handleTrailers(headers);
}); });
stream.on('data', (data: Buffer) => { stream.on('data', (data: Buffer) => {
/* If the status has already been output, allow the http2 stream to
* drain without processing the data. */
if (this.statusOutput) {
return;
}
this.trace('receive HTTP/2 data frame of length ' + data.length); this.trace('receive HTTP/2 data frame of length ' + data.length);
const messages = this.decoder.write(data); const messages = this.decoder.write(data);
@ -688,9 +698,6 @@ export class Http2CallStream implements Call {
} }
this.streamEndWatchers.forEach(watcher => watcher(false)); this.streamEndWatchers.forEach(watcher => watcher(false));
}); });
if (!this.pendingRead) {
stream.pause();
}
if (this.pendingWrite) { if (this.pendingWrite) {
if (!this.pendingWriteCallback) { if (!this.pendingWriteCallback) {
throw new Error('Invalid state in write handling code'); throw new Error('Invalid state in write handling code');