From ec9e82554b9586584a82aac13d66ce847b66172c Mon Sep 17 00:00:00 2001 From: cjihrig Date: Fri, 3 May 2019 11:40:52 -0400 Subject: [PATCH] fixup! grpc-js: support unary and server streaming rpcs --- packages/grpc-js/src/server-call.ts | 86 ++++++++++++----------------- packages/grpc-js/src/server.ts | 8 +-- 2 files changed, 40 insertions(+), 54 deletions(-) diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 12bc6831..866b8889 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -100,13 +100,12 @@ export class ServerUnaryCallImpl extends EventEmitter export class ServerReadableStreamImpl extends Readable implements ServerReadableStream { cancelled: boolean; - private done = false; constructor( private call: Http2ServerCallStream, public metadata: Metadata, private _deserialize: Deserialize) { - super(); + super({objectMode: true}); this.cancelled = false; } @@ -117,11 +116,6 @@ export class ServerReadableStreamImpl extends sendMetadata(responseMetadata: Metadata): void { this.call.sendMetadata(responseMetadata); } - - _done(): void { - this.done = true; - this.on('data', noop); - } } @@ -129,6 +123,7 @@ export class ServerWritableStreamImpl extends Writable implements ServerWritableStream { cancelled: boolean; request: RequestType|null; + private trailingMetadata: Metadata; constructor( private call: Http2ServerCallStream, @@ -136,6 +131,7 @@ export class ServerWritableStreamImpl extends super({objectMode: true}); this.cancelled = false; this.request = null; + this.trailingMetadata = new Metadata(); this.on('error', (err) => { this.call.sendError(err as ServiceError); @@ -171,14 +167,15 @@ export class ServerWritableStreamImpl extends } _final(callback: Function): void { - this.call.sendStatus({code: Status.OK, details: 'OK'} as StatusObject); + this.call.sendStatus( + {code: Status.OK, details: 'OK', metadata: this.trailingMetadata}); callback(null); } // tslint:disable-next-line:no-any end(metadata?: any) { if (metadata) { - this.call.setMetadata(metadata); + this.trailingMetadata = metadata; } super.end(); @@ -202,7 +199,7 @@ export class ServerDuplexStreamImpl extends Duplex private call: Http2ServerCallStream, public metadata: Metadata, private _serialize: Serialize, private _deserialize: Deserialize) { - super(); + super({objectMode: true}); this.cancelled = false; } @@ -254,23 +251,24 @@ export type Handler = { export type HandlerType = 'bidi'|'clientStream'|'serverStream'|'unary'; +const noopTimer: NodeJS.Timer = setTimeout(() => {}, 0); // Internal class that wraps the HTTP2 request. export class Http2ServerCallStream extends EventEmitter { cancelled = false; - deadline: NodeJS.Timer|null = null; - private metadata: Metadata|null = null; + deadline: NodeJS.Timer = noopTimer; private wantTrailers = false; + private metadataSent = false; constructor( private stream: http2.ServerHttp2Stream, - private handler: Handler|null) { + private handler: Handler) { super(); this.stream.once('error', (err: ServiceError) => { err.code = Status.INTERNAL; - this.sendError(err as ServiceError); + this.sendError(err); }); this.stream.once('close', () => { @@ -279,21 +277,18 @@ export class Http2ServerCallStream extends this.emit('cancelled', 'cancelled'); } }); - } - setMetadata(metadata: Metadata): void { - this.metadata = metadata; - } - - private get _metadataSent(): boolean { - return this.stream.headersSent; + this.stream.on('drain', () => { + this.emit('drain'); + }); } sendMetadata(customMetadata?: Metadata) { - if (this._metadataSent) { + if (this.metadataSent) { return; } + this.metadataSent = true; const custom = customMetadata ? customMetadata.toHttp2Headers() : null; // TODO(cjihrig): Include compression headers. const headers = Object.assign(defaultResponseHeaders, custom); @@ -352,8 +347,7 @@ export class Http2ServerCallStream extends } serializeMessage(value: ResponseType) { - const handler = this.handler as Handler; - const messageBuffer = handler.serialize(value); + const messageBuffer = this.handler.serialize(value); // TODO(cjihrig): Call compression aware serializeMessage(). const byteLength = messageBuffer.byteLength; @@ -365,31 +359,30 @@ export class Http2ServerCallStream extends } async deserializeMessage(bytes: Buffer) { - const handler = this.handler as Handler; // TODO(cjihrig): Call compression aware deserializeMessage(). const receivedMessage = bytes.slice(5); - return handler.deserialize(receivedMessage); + return this.handler.deserialize(receivedMessage); } async sendUnaryMessage( err: ServiceError|null, value: ResponseType|null, metadata?: Metadata, flags?: number) { - if (err) { - if (metadata) { - err.metadata = metadata; - } + if (!metadata) { + metadata = new Metadata(); + } + if (err) { + err.metadata = metadata; this.sendError(err); return; } try { - const response = await this.serializeMessage(value as ResponseType); + const response = await this.serializeMessage(value!); this.write(response); - this.sendStatus( - {code: Status.OK, details: 'OK', metadata} as StatusObject); + this.sendStatus({code: Status.OK, details: 'OK', metadata}); } catch (err) { err.code = Status.INTERNAL; this.sendError(err); @@ -397,28 +390,21 @@ export class Http2ServerCallStream extends } sendStatus(statusObj: StatusObject) { - if (this.cancelled === true) { + if (this.cancelled) { return; } - if (this.deadline !== null) { - clearTimeout(this.deadline); - this.deadline = null; - } + clearTimeout(this.deadline); if (!this.wantTrailers) { this.wantTrailers = true; this.stream.once('wantTrailers', () => { - let trailersToSend = { - [GRPC_STATUS_HEADER]: statusObj.code, - [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string) - }; - const metadata = statusObj.metadata || this.metadata; - - if (metadata) { - trailersToSend = - Object.assign(trailersToSend, metadata.toHttp2Headers()); - } + const trailersToSend = Object.assign( + { + [GRPC_STATUS_HEADER]: statusObj.code, + [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string) + }, + statusObj.metadata.toHttp2Headers()); this.stream.sendTrailers(trailersToSend); }); @@ -433,7 +419,7 @@ export class Http2ServerCallStream extends details: error.hasOwnProperty('message') ? error.message : 'Unknown Error', metadata: error.hasOwnProperty('metadata') ? error.metadata : - this.metadata as Metadata + new Metadata() }; if (error.hasOwnProperty('code') && Number.isInteger(error.code)) { @@ -448,7 +434,7 @@ export class Http2ServerCallStream extends } write(chunk: Buffer) { - if (this.cancelled === true) { + if (this.cancelled) { return; } diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 32bb52a2..050799d4 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -239,7 +239,7 @@ export class Server { 'stream', (stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => { - if (this.started !== true) { + if (!this.started) { stream.end(); return; } @@ -273,7 +273,7 @@ export class Server { throw new Error(`Unknown handler type: ${handler.type}`); } } catch (err) { - const call = new Http2ServerCallStream(stream, null); + const call = new Http2ServerCallStream(stream, null!); err.code = Status.INTERNAL; call.sendError(err); } @@ -290,7 +290,7 @@ async function handleUnary( new ServerUnaryCallImpl(call, metadata); const request = await call.receiveUnaryMessage(); - if (request === undefined || call.cancelled === true) { + if (request === undefined || call.cancelled) { return; } @@ -317,7 +317,7 @@ async function handleServerStreaming( metadata: Metadata): Promise { const request = await call.receiveUnaryMessage(); - if (request === undefined || call.cancelled === true) { + if (request === undefined || call.cancelled) { return; }