diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 1bc43e14..c7def569 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -259,8 +259,11 @@ export class Http2CallStream implements Call { } private push(message: Buffer): void { - this.listener!.onReceiveMessage(message); - this.maybeOutputStatus(); + this.canPush = false; + process.nextTick(() => { + this.listener!.onReceiveMessage(message); + this.maybeOutputStatus(); + }); } private handleFilterError(error: Error) { @@ -276,9 +279,8 @@ export class Http2CallStream implements Call { } this.isReadFilterPending = false; if (this.canPush) { - this.push(message); - this.canPush = false; this.http2Stream!.pause(); + this.push(message); } else { this.unpushedReadMessages.push(message); } @@ -315,7 +317,6 @@ export class Http2CallStream implements Call { } private handleTrailers(headers: http2.IncomingHttpHeaders) { - this.readsClosed = true; const code: Status = this.mappedStatusCode; const details = ''; let metadata: Metadata; @@ -527,6 +528,7 @@ export class Http2CallStream implements Call { /* If we have already emitted a status, we should not emit any more * messages and we should communicate that the stream has ended */ if (this.finalStatus !== null) { + this.readsClosed = true; this.maybeOutputStatus(); return; } @@ -537,12 +539,10 @@ export class Http2CallStream implements Call { if (this.unpushedReadMessages.length > 0) { const nextMessage: Buffer = this.unpushedReadMessages.shift()!; this.push(nextMessage); - this.canPush = false; return; } /* Only resume reading from the http2Stream if we don't have any pending - * messages to emit, and we haven't gotten the signal to stop pushing - * messages */ + * messages to emit */ this.http2Stream.resume(); } }