fix: make stream.write() synchronous in server-call

This commit is contained in:
Alexander Fenster 2020-04-22 13:58:26 -07:00
parent ec82d9c72b
commit ebfc5c9594
3 changed files with 36 additions and 8 deletions

View File

@ -172,14 +172,14 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
this.call.sendMetadata(responseMetadata);
}
async _write(
_write(
chunk: ResponseType,
encoding: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
callback: (...args: any[]) => void
) {
try {
const response = await this.call.serializeMessage(chunk);
const response = this.call.serializeMessage(chunk);
if (!this.call.write(response)) {
this.call.once('drain', callback);
@ -454,7 +454,7 @@ export class Http2ServerCallStream<
resolve();
}
resolve(await this.deserializeMessage(requestBytes));
resolve(this.deserializeMessage(requestBytes));
} catch (err) {
err.code = Status.INTERNAL;
this.sendError(err);
@ -476,7 +476,7 @@ export class Http2ServerCallStream<
return output;
}
async deserializeMessage(bytes: Buffer) {
deserializeMessage(bytes: Buffer) {
// TODO(cjihrig): Call compression aware deserializeMessage().
const receivedMessage = bytes.slice(5);
@ -505,7 +505,7 @@ export class Http2ServerCallStream<
}
try {
const response = await this.serializeMessage(value!);
const response = this.serializeMessage(value!);
this.write(response);
this.sendStatus({ code: Status.OK, details: 'OK', metadata });

View File

@ -20,6 +20,7 @@ syntax = "proto3";
message Request {
bool error = 1;
string message = 2;
int32 errorAfter = 3;
}
message Response {

View File

@ -347,11 +347,20 @@ describe('Other conditions', () => {
metadata: trailerMetadata,
});
} else {
for (let i = 0; i < 5; i++) {
for (let i = 1; i <= 5; i++) {
stream.write({ count: i });
if (req.errorAfter && req.errorAfter === i) {
stream.emit('error', {
code: grpc.status.UNKNOWN,
details: req.message || 'Requested error',
metadata: trailerMetadata,
});
break;
}
}
if (!req.errorAfter) {
stream.end(trailerMetadata);
}
stream.end(trailerMetadata);
}
},
@ -712,6 +721,24 @@ describe('Other conditions', () => {
);
});
});
describe('should handle server stream errors correctly', () => {
it('should emit data for all messages before error', (done) => {
const expectedDataCount = 2;
const call = client.serverStream({ errorAfter: expectedDataCount });
let actualDataCount = 0;
call.on('data', () => {
++actualDataCount;
});
call.on('error', (error: ServiceError) => {
assert.strictEqual(error.code, grpc.status.UNKNOWN);
assert.strictEqual(error.details, 'Requested error');
assert.strictEqual(actualDataCount, expectedDataCount);
done();
});
});
});
});
function identity(arg: any): any {