From b8af8c94744ac04ccd4d1ee124419c6567b1b965 Mon Sep 17 00:00:00 2001 From: cjihrig Date: Tue, 7 May 2019 14:23:34 -0400 Subject: [PATCH] grpc-js: add client streaming RPC support This commit adds client streaming RPC support. --- packages/grpc-js/src/server-call.ts | 44 ++++++++++ packages/grpc-js/src/server.ts | 17 +++- packages/grpc-js/test/test-server-errors.ts | 97 +++++++++++++++++++++ packages/grpc-js/test/test-server.ts | 10 +++ 4 files changed, 167 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 9528c30c..0e17ceec 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -24,6 +24,7 @@ import {StatusObject} from './call-stream'; import {Status} from './constants'; import {Deserialize, Serialize} from './make-client'; import {Metadata} from './metadata'; +import {StreamDecoder} from './stream-decoder'; function noop(): void {} @@ -100,6 +101,7 @@ export class ServerUnaryCallImpl extends EventEmitter export class ServerReadableStreamImpl extends Readable implements ServerReadableStream { cancelled: boolean; + private decoder: StreamDecoder; constructor( private call: Http2ServerCallStream, @@ -107,6 +109,44 @@ export class ServerReadableStreamImpl extends private _deserialize: Deserialize) { super({objectMode: true}); this.cancelled = false; + this.decoder = new StreamDecoder(); + + const http2Stream = this.call._getHttp2Stream(); + + http2Stream.on('data', async (data: Buffer) => { + const message = this.decoder.write(data); + + if (message === null) { + return; + } + + try { + const deserialized = await this.call.deserializeMessage(message); + + if (!this.push(deserialized)) { + http2Stream.pause(); + } + } catch (err) { + err.code = Status.INTERNAL; + this.emit('error', err); + } + }); + + http2Stream.once('end', () => { + this.push(null); + }); + } + + _read(size: number) { + this.call._getHttp2Stream().resume(); + } + + deserialize(input: Buffer): RequestType|null { + if (input === null || input === undefined) { + return null; + } + + return this._deserialize(input); } getPeer(): string { @@ -468,6 +508,10 @@ export class Http2ServerCallStream extends this.sendMetadata(); return this.stream.write(chunk); } + + _getHttp2Stream(): http2.ServerHttp2Stream { + return this.stream; + } } // tslint:disable:no-any diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 421c089d..68f6e149 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -314,7 +314,22 @@ function handleClientStreaming( call: Http2ServerCallStream, handler: ClientStreamingHandler, metadata: Metadata): void { - throw new Error('not implemented yet'); + const stream = new ServerReadableStreamImpl( + call, metadata, handler.deserialize); + + function respond( + err: ServiceError|null, value: ResponseType|null, trailer?: Metadata, + flags?: number) { + stream.destroy(); + call.sendUnaryMessage(err, value, trailer, flags); + } + + if (call.cancelled) { + return; + } + + stream.on('error', respond); + handler.func(stream, respond); } diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts index eabae3aa..6f352f73 100644 --- a/packages/grpc-js/test/test-server-errors.ts +++ b/packages/grpc-js/test/test-server-errors.ts @@ -126,6 +126,17 @@ describe('Client malformed response handling', () => { }); }); + it('should get an INTERNAL status with a client stream call', (done) => { + const call = client.clientStream((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + + call.write({}); + call.end(); + }); + it('should get an INTERNAL status with a server stream call', (done) => { const call = client.serverStream({}); @@ -229,6 +240,17 @@ describe('Server serialization failure handling', () => { }); }); + it('should get an INTERNAL status with a client stream call', (done) => { + const call = client.clientStream((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + + call.write({}); + call.end(); + }); + it('should get an INTERNAL status with a server stream call', (done) => { const call = client.serverStream({}); @@ -397,6 +419,18 @@ describe('Other conditions', () => { }); }); + it('should respond correctly to a client stream', (done) => { + const call = + misbehavingClient.clientStream((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + + call.write(badArg); + call.end(); + }); + it('should respond correctly to a server stream', (done) => { const call = misbehavingClient.serverStream(badArg); @@ -457,6 +491,56 @@ describe('Other conditions', () => { }); }); + it('should be present when a client stream call succeeds', (done) => { + let count = 0; + const call = client.clientStream((err: ServiceError, data: any) => { + assert.ifError(err); + + count++; + if (count === 2) { + done(); + } + }); + + call.write({error: false}); + call.write({error: false}); + call.end(); + + call.on('status', (status: grpc.StatusObject) => { + assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']); + + count++; + if (count === 2) { + done(); + } + }); + }); + + it('should be present when a client stream call fails', (done) => { + let count = 0; + const call = client.clientStream((err: ServiceError, data: any) => { + assert(err); + + count++; + if (count === 2) { + done(); + } + }); + + call.write({error: false}); + call.write({error: true}); + call.end(); + + call.on('status', (status: grpc.StatusObject) => { + assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']); + + count++; + if (count === 2) { + done(); + } + }); + }); + it('should be present when a server stream call succeeds', (done) => { const call = client.serverStream({error: false}); @@ -489,6 +573,19 @@ describe('Other conditions', () => { }); }); + it('for a client stream call', (done) => { + const call = client.clientStream((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNKNOWN); + assert.strictEqual(err.details, 'Requested error'); + done(); + }); + + call.write({error: false}); + call.write({error: true}); + call.end(); + }); + it('for a server stream call', (done) => { const call = client.serverStream({error: true}); diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index f44f5d5d..d310ff07 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -263,6 +263,16 @@ describe('Server', () => { }); }); + it('should respond to a client stream with UNIMPLEMENTED', (done) => { + const call = client.sum((error: ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED); + done(); + }); + + call.end(); + }); + it('should respond to a server stream with UNIMPLEMENTED', (done) => { const call = client.fib({limit: 5});