Fix dropped messages when multiple arrived in one HTTP/2 frame

This commit is contained in:
murgatroid99 2019-06-03 18:21:39 -07:00
parent 9c274034d0
commit afb7b4a602
3 changed files with 29 additions and 19 deletions

View File

@ -289,9 +289,9 @@ export class Http2CallStream extends Duplex implements Call {
}); });
stream.on('trailers', this.handleTrailers.bind(this)); stream.on('trailers', this.handleTrailers.bind(this));
stream.on('data', (data: Buffer) => { stream.on('data', (data: Buffer) => {
const message = this.decoder.write(data); const messages = this.decoder.write(data);
if (message !== null) { for (const message of messages) {
this.tryPush(message); this.tryPush(message);
} }
}); });

View File

@ -531,22 +531,31 @@ export class Http2ServerCallStream<
) { ) {
const decoder = new StreamDecoder(); const decoder = new StreamDecoder();
/* This code here is wrong but getting the client working is the priority
* right now and I'm not going to block that on fixing what is currently
* unused code. If multiple messages come in with a single frame, this will
* keep calling readable.push after it has returned false, which should not
* happen. Independent of that, deserializeMessage is asynchronous, which
* means that incoming messages could be reordered when emitted to the
* application. That effect will become more pronounced when compression
* support is added and deserializeMessage takes longer by an amount of
* time dependent on the size of the message. A system like the one in
* call-stream.ts should be added to buffer incoming messages and
* preserve ordering more strongly */
this.stream.on('data', async (data: Buffer) => { this.stream.on('data', async (data: Buffer) => {
const message = decoder.write(data); const messages = decoder.write(data);
for (const message of messages) {
try {
const deserialized = await this.deserializeMessage(message);
if (message === null) { if (!readable.push(deserialized)) {
return; this.stream.pause();
} }
} catch (err) {
try { err.code = Status.INTERNAL;
const deserialized = await this.deserializeMessage(message); readable.emit('error', err);
if (!readable.push(deserialized)) {
this.stream.pause();
} }
} catch (err) {
err.code = Status.INTERNAL;
readable.emit('error', err);
} }
}); });

View File

@ -30,9 +30,10 @@ export class StreamDecoder {
private readPartialMessage: Buffer[] = []; private readPartialMessage: Buffer[] = [];
private readMessageRemaining = 0; private readMessageRemaining = 0;
write(data: Buffer): Buffer | null { write(data: Buffer): Buffer[] {
let readHead = 0; let readHead = 0;
let toRead: number; let toRead: number;
let result: Buffer[] = [];
while (readHead < data.length) { while (readHead < data.length) {
switch (this.readState) { switch (this.readState) {
@ -69,7 +70,7 @@ export class StreamDecoder {
); );
this.readState = ReadState.NO_DATA; this.readState = ReadState.NO_DATA;
return message; result.push(message);
} }
} }
break; break;
@ -91,7 +92,7 @@ export class StreamDecoder {
); );
this.readState = ReadState.NO_DATA; this.readState = ReadState.NO_DATA;
return framedMessage; result.push(framedMessage);
} }
break; break;
default: default:
@ -99,6 +100,6 @@ export class StreamDecoder {
} }
} }
return null; return result;
} }
} }