diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index e7377b56..f7f3f1ff 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -172,14 +172,14 @@ export class ServerWritableStreamImpl this.call.sendMetadata(responseMetadata); } - async _write( + _write( chunk: ResponseType, encoding: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any callback: (...args: any[]) => void ) { try { - const response = await this.call.serializeMessage(chunk); + const response = this.call.serializeMessage(chunk); if (!this.call.write(response)) { this.call.once('drain', callback); @@ -454,7 +454,7 @@ export class Http2ServerCallStream< resolve(); } - resolve(await this.deserializeMessage(requestBytes)); + resolve(this.deserializeMessage(requestBytes)); } catch (err) { err.code = Status.INTERNAL; this.sendError(err); @@ -476,7 +476,7 @@ export class Http2ServerCallStream< return output; } - async deserializeMessage(bytes: Buffer) { + deserializeMessage(bytes: Buffer) { // TODO(cjihrig): Call compression aware deserializeMessage(). const receivedMessage = bytes.slice(5); @@ -505,7 +505,7 @@ export class Http2ServerCallStream< } try { - const response = await this.serializeMessage(value!); + const response = this.serializeMessage(value!); this.write(response); this.sendStatus({ code: Status.OK, details: 'OK', metadata }); diff --git a/packages/grpc-js/test/fixtures/test_service.proto b/packages/grpc-js/test/fixtures/test_service.proto index db876be9..f99393d1 100644 --- a/packages/grpc-js/test/fixtures/test_service.proto +++ b/packages/grpc-js/test/fixtures/test_service.proto @@ -20,6 +20,7 @@ syntax = "proto3"; message Request { bool error = 1; string message = 2; + int32 errorAfter = 3; } message Response { diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts index 7c611b9b..91b7c196 100644 --- a/packages/grpc-js/test/test-server-errors.ts +++ b/packages/grpc-js/test/test-server-errors.ts @@ -347,11 +347,20 @@ describe('Other conditions', () => { metadata: trailerMetadata, }); } else { - for (let i = 0; i < 5; i++) { + for (let i = 1; i <= 5; i++) { stream.write({ count: i }); + if (req.errorAfter && req.errorAfter === i) { + stream.emit('error', { + code: grpc.status.UNKNOWN, + details: req.message || 'Requested error', + metadata: trailerMetadata, + }); + break; + } + } + if (!req.errorAfter) { + stream.end(trailerMetadata); } - - stream.end(trailerMetadata); } }, @@ -712,6 +721,24 @@ describe('Other conditions', () => { ); }); }); + + describe('should handle server stream errors correctly', () => { + it('should emit data for all messages before error', (done) => { + const expectedDataCount = 2; + const call = client.serverStream({ errorAfter: expectedDataCount }); + + let actualDataCount = 0; + call.on('data', () => { + ++actualDataCount; + }); + call.on('error', (error: ServiceError) => { + assert.strictEqual(error.code, grpc.status.UNKNOWN); + assert.strictEqual(error.details, 'Requested error'); + assert.strictEqual(actualDataCount, expectedDataCount); + done(); + }); + }); + }); }); function identity(arg: any): any {