grpc-js: add setupReadable() to Http2ServerCallStream

This commit adds a setupReadable() method to
Http2ServerCallStream. This is used to set up the plumbing
between the HTTP2 stream and the surface readable/bidi
calls.
This commit is contained in:
cjihrig 2019-05-14 15:07:12 -04:00
parent b8af8c9474
commit e0a30907f9
No known key found for this signature in database
GPG Key ID: 7434390BDBE9B9C5
1 changed files with 32 additions and 30 deletions

View File

@ -101,7 +101,6 @@ export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
export class ServerReadableStreamImpl<RequestType, ResponseType> extends
Readable implements ServerReadableStream<RequestType, ResponseType> {
cancelled: boolean;
private decoder: StreamDecoder;
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
@ -109,36 +108,11 @@ export class ServerReadableStreamImpl<RequestType, ResponseType> extends
private _deserialize: Deserialize<RequestType>) {
super({objectMode: true});
this.cancelled = false;
this.decoder = new StreamDecoder();
const http2Stream = this.call._getHttp2Stream();
http2Stream.on('data', async (data: Buffer) => {
const message = this.decoder.write(data);
if (message === null) {
return;
}
try {
const deserialized = await this.call.deserializeMessage(message);
if (!this.push(deserialized)) {
http2Stream.pause();
}
} catch (err) {
err.code = Status.INTERNAL;
this.emit('error', err);
}
});
http2Stream.once('end', () => {
this.push(null);
});
this.call.setupReadable(this);
}
_read(size: number) {
this.call._getHttp2Stream().resume();
this.call.resume();
}
deserialize(input: Buffer): RequestType|null {
@ -509,8 +483,36 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
return this.stream.write(chunk);
}
_getHttp2Stream(): http2.ServerHttp2Stream {
return this.stream;
resume() {
this.stream.resume();
}
setupReadable(readable: ServerReadableStream<RequestType, ResponseType>|
ServerDuplexStream<RequestType, ResponseType>) {
const decoder = new StreamDecoder();
this.stream.on('data', async (data: Buffer) => {
const message = decoder.write(data);
if (message === null) {
return;
}
try {
const deserialized = await this.deserializeMessage(message);
if (!readable.push(deserialized)) {
this.stream.pause();
}
} catch (err) {
err.code = Status.INTERNAL;
readable.emit('error', err);
}
});
this.stream.once('end', () => {
readable.push(null);
});
}
}