Push messages to reader asynchronously.

This commit is contained in:
murgatroid99 2019-11-18 11:25:20 -08:00
parent 90ce40f91c
commit 4409ef8dfd
1 changed files with 8 additions and 8 deletions

View File

@ -259,8 +259,11 @@ export class Http2CallStream implements Call {
} }
private push(message: Buffer): void { private push(message: Buffer): void {
this.listener!.onReceiveMessage(message); this.canPush = false;
this.maybeOutputStatus(); process.nextTick(() => {
this.listener!.onReceiveMessage(message);
this.maybeOutputStatus();
});
} }
private handleFilterError(error: Error) { private handleFilterError(error: Error) {
@ -276,9 +279,8 @@ export class Http2CallStream implements Call {
} }
this.isReadFilterPending = false; this.isReadFilterPending = false;
if (this.canPush) { if (this.canPush) {
this.push(message);
this.canPush = false;
this.http2Stream!.pause(); this.http2Stream!.pause();
this.push(message);
} else { } else {
this.unpushedReadMessages.push(message); this.unpushedReadMessages.push(message);
} }
@ -315,7 +317,6 @@ export class Http2CallStream implements Call {
} }
private handleTrailers(headers: http2.IncomingHttpHeaders) { private handleTrailers(headers: http2.IncomingHttpHeaders) {
this.readsClosed = true;
const code: Status = this.mappedStatusCode; const code: Status = this.mappedStatusCode;
const details = ''; const details = '';
let metadata: Metadata; let metadata: Metadata;
@ -527,6 +528,7 @@ export class Http2CallStream implements Call {
/* If we have already emitted a status, we should not emit any more /* If we have already emitted a status, we should not emit any more
* messages and we should communicate that the stream has ended */ * messages and we should communicate that the stream has ended */
if (this.finalStatus !== null) { if (this.finalStatus !== null) {
this.readsClosed = true;
this.maybeOutputStatus(); this.maybeOutputStatus();
return; return;
} }
@ -537,12 +539,10 @@ export class Http2CallStream implements Call {
if (this.unpushedReadMessages.length > 0) { if (this.unpushedReadMessages.length > 0) {
const nextMessage: Buffer = this.unpushedReadMessages.shift()!; const nextMessage: Buffer = this.unpushedReadMessages.shift()!;
this.push(nextMessage); this.push(nextMessage);
this.canPush = false;
return; return;
} }
/* Only resume reading from the http2Stream if we don't have any pending /* Only resume reading from the http2Stream if we don't have any pending
* messages to emit, and we haven't gotten the signal to stop pushing * messages to emit */
* messages */
this.http2Stream.resume(); this.http2Stream.resume();
} }
} }