From e0a30907f97f0c53510cffb499290877f990f4e2 Mon Sep 17 00:00:00 2001 From: cjihrig Date: Tue, 14 May 2019 15:07:12 -0400 Subject: [PATCH] grpc-js: add setupReadable() to Http2ServerCallStream This commit adds a setupReadable() method to Http2ServerCallStream. This is used to set up the plumbing between the HTTP2 stream and the surface readable/bidi calls. --- packages/grpc-js/src/server-call.ts | 62 +++++++++++++++-------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 0e17ceec..dc651806 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -101,7 +101,6 @@ export class ServerUnaryCallImpl extends EventEmitter export class ServerReadableStreamImpl extends Readable implements ServerReadableStream { cancelled: boolean; - private decoder: StreamDecoder; constructor( private call: Http2ServerCallStream, @@ -109,36 +108,11 @@ export class ServerReadableStreamImpl extends private _deserialize: Deserialize) { super({objectMode: true}); this.cancelled = false; - this.decoder = new StreamDecoder(); - - const http2Stream = this.call._getHttp2Stream(); - - http2Stream.on('data', async (data: Buffer) => { - const message = this.decoder.write(data); - - if (message === null) { - return; - } - - try { - const deserialized = await this.call.deserializeMessage(message); - - if (!this.push(deserialized)) { - http2Stream.pause(); - } - } catch (err) { - err.code = Status.INTERNAL; - this.emit('error', err); - } - }); - - http2Stream.once('end', () => { - this.push(null); - }); + this.call.setupReadable(this); } _read(size: number) { - this.call._getHttp2Stream().resume(); + this.call.resume(); } deserialize(input: Buffer): RequestType|null { @@ -509,8 +483,36 @@ export class Http2ServerCallStream extends return this.stream.write(chunk); } - _getHttp2Stream(): http2.ServerHttp2Stream { - return this.stream; + resume() { + this.stream.resume(); + } + + setupReadable(readable: ServerReadableStream| + ServerDuplexStream) { + const decoder = new StreamDecoder(); + + this.stream.on('data', async (data: Buffer) => { + const message = decoder.write(data); + + if (message === null) { + return; + } + + try { + const deserialized = await this.deserializeMessage(message); + + if (!readable.push(deserialized)) { + this.stream.pause(); + } + } catch (err) { + err.code = Status.INTERNAL; + readable.emit('error', err); + } + }); + + this.stream.once('end', () => { + readable.push(null); + }); } }