diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index b10ad4a4..72b4b10e 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.6.9", + "version": "1.6.10", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index f265512f..c405405e 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -329,6 +329,11 @@ export class Http2CallStream implements Call { process.nextTick(() => { this.listener?.onReceiveStatus(filteredStatus); }); + /* Leave the http2 stream in flowing state to drain incoming messages, to + * ensure that the stream closure completes. The call stream already does + * not push more messages after the status is output, so the messages go + * nowhere either way. */ + this.http2Stream?.resume(); if (this.subchannel) { this.subchannel.callUnref(); this.subchannel.removeDisconnectListener(this.disconnectListener); @@ -577,6 +582,11 @@ export class Http2CallStream implements Call { this.handleTrailers(headers); }); stream.on('data', (data: Buffer) => { + /* If the status has already been output, allow the http2 stream to + * drain without processing the data. */ + if (this.statusOutput) { + return; + } this.trace('receive HTTP/2 data frame of length ' + data.length); const messages = this.decoder.write(data); @@ -688,9 +698,6 @@ export class Http2CallStream implements Call { } this.streamEndWatchers.forEach(watcher => watcher(false)); }); - if (!this.pendingRead) { - stream.pause(); - } if (this.pendingWrite) { if (!this.pendingWriteCallback) { throw new Error('Invalid state in write handling code');