fixup! grpc-js: support unary and server streaming rpcs

This commit is contained in:
cjihrig 2019-05-03 11:40:52 -04:00
parent c050bf5ad8
commit ec9e82554b
No known key found for this signature in database
GPG Key ID: 7434390BDBE9B9C5
2 changed files with 40 additions and 54 deletions

View File

@ -100,13 +100,12 @@ export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
export class ServerReadableStreamImpl<RequestType, ResponseType> extends
Readable implements ServerReadableStream<RequestType, ResponseType> {
cancelled: boolean;
private done = false;
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata,
private _deserialize: Deserialize<RequestType>) {
super();
super({objectMode: true});
this.cancelled = false;
}
@ -117,11 +116,6 @@ export class ServerReadableStreamImpl<RequestType, ResponseType> extends
sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata);
}
_done(): void {
this.done = true;
this.on('data', noop);
}
}
@ -129,6 +123,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType> extends
Writable implements ServerWritableStream<RequestType, ResponseType> {
cancelled: boolean;
request: RequestType|null;
private trailingMetadata: Metadata;
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
@ -136,6 +131,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType> extends
super({objectMode: true});
this.cancelled = false;
this.request = null;
this.trailingMetadata = new Metadata();
this.on('error', (err) => {
this.call.sendError(err as ServiceError);
@ -171,14 +167,15 @@ export class ServerWritableStreamImpl<RequestType, ResponseType> extends
}
_final(callback: Function): void {
this.call.sendStatus({code: Status.OK, details: 'OK'} as StatusObject);
this.call.sendStatus(
{code: Status.OK, details: 'OK', metadata: this.trailingMetadata});
callback(null);
}
// tslint:disable-next-line:no-any
end(metadata?: any) {
if (metadata) {
this.call.setMetadata(metadata);
this.trailingMetadata = metadata;
}
super.end();
@ -202,7 +199,7 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata, private _serialize: Serialize<ResponseType>,
private _deserialize: Deserialize<RequestType>) {
super();
super({objectMode: true});
this.cancelled = false;
}
@ -254,23 +251,24 @@ export type Handler<RequestType, ResponseType> = {
export type HandlerType = 'bidi'|'clientStream'|'serverStream'|'unary';
const noopTimer: NodeJS.Timer = setTimeout(() => {}, 0);
// Internal class that wraps the HTTP2 request.
export class Http2ServerCallStream<RequestType, ResponseType> extends
EventEmitter {
cancelled = false;
deadline: NodeJS.Timer|null = null;
private metadata: Metadata|null = null;
deadline: NodeJS.Timer = noopTimer;
private wantTrailers = false;
private metadataSent = false;
constructor(
private stream: http2.ServerHttp2Stream,
private handler: Handler<RequestType, ResponseType>|null) {
private handler: Handler<RequestType, ResponseType>) {
super();
this.stream.once('error', (err: ServiceError) => {
err.code = Status.INTERNAL;
this.sendError(err as ServiceError);
this.sendError(err);
});
this.stream.once('close', () => {
@ -279,21 +277,18 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
this.emit('cancelled', 'cancelled');
}
});
}
setMetadata(metadata: Metadata): void {
this.metadata = metadata;
}
private get _metadataSent(): boolean {
return this.stream.headersSent;
this.stream.on('drain', () => {
this.emit('drain');
});
}
sendMetadata(customMetadata?: Metadata) {
if (this._metadataSent) {
if (this.metadataSent) {
return;
}
this.metadataSent = true;
const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
// TODO(cjihrig): Include compression headers.
const headers = Object.assign(defaultResponseHeaders, custom);
@ -352,8 +347,7 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
}
serializeMessage(value: ResponseType) {
const handler = this.handler as Handler<RequestType, ResponseType>;
const messageBuffer = handler.serialize(value);
const messageBuffer = this.handler.serialize(value);
// TODO(cjihrig): Call compression aware serializeMessage().
const byteLength = messageBuffer.byteLength;
@ -365,31 +359,30 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
}
async deserializeMessage(bytes: Buffer) {
const handler = this.handler as Handler<RequestType, ResponseType>;
// TODO(cjihrig): Call compression aware deserializeMessage().
const receivedMessage = bytes.slice(5);
return handler.deserialize(receivedMessage);
return this.handler.deserialize(receivedMessage);
}
async sendUnaryMessage(
err: ServiceError|null, value: ResponseType|null, metadata?: Metadata,
flags?: number) {
if (err) {
if (metadata) {
err.metadata = metadata;
}
if (!metadata) {
metadata = new Metadata();
}
if (err) {
err.metadata = metadata;
this.sendError(err);
return;
}
try {
const response = await this.serializeMessage(value as ResponseType);
const response = await this.serializeMessage(value!);
this.write(response);
this.sendStatus(
{code: Status.OK, details: 'OK', metadata} as StatusObject);
this.sendStatus({code: Status.OK, details: 'OK', metadata});
} catch (err) {
err.code = Status.INTERNAL;
this.sendError(err);
@ -397,28 +390,21 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
}
sendStatus(statusObj: StatusObject) {
if (this.cancelled === true) {
if (this.cancelled) {
return;
}
if (this.deadline !== null) {
clearTimeout(this.deadline);
this.deadline = null;
}
clearTimeout(this.deadline);
if (!this.wantTrailers) {
this.wantTrailers = true;
this.stream.once('wantTrailers', () => {
let trailersToSend = {
[GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string)
};
const metadata = statusObj.metadata || this.metadata;
if (metadata) {
trailersToSend =
Object.assign(trailersToSend, metadata.toHttp2Headers());
}
const trailersToSend = Object.assign(
{
[GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string)
},
statusObj.metadata.toHttp2Headers());
this.stream.sendTrailers(trailersToSend);
});
@ -433,7 +419,7 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
details: error.hasOwnProperty('message') ? error.message :
'Unknown Error',
metadata: error.hasOwnProperty('metadata') ? error.metadata :
this.metadata as Metadata
new Metadata()
};
if (error.hasOwnProperty('code') && Number.isInteger(error.code)) {
@ -448,7 +434,7 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
}
write(chunk: Buffer) {
if (this.cancelled === true) {
if (this.cancelled) {
return;
}

View File

@ -239,7 +239,7 @@ export class Server {
'stream',
(stream: http2.ServerHttp2Stream,
headers: http2.IncomingHttpHeaders) => {
if (this.started !== true) {
if (!this.started) {
stream.end();
return;
}
@ -273,7 +273,7 @@ export class Server {
throw new Error(`Unknown handler type: ${handler.type}`);
}
} catch (err) {
const call = new Http2ServerCallStream(stream, null);
const call = new Http2ServerCallStream(stream, null!);
err.code = Status.INTERNAL;
call.sendError(err);
}
@ -290,7 +290,7 @@ async function handleUnary<RequestType, ResponseType>(
new ServerUnaryCallImpl<RequestType, ResponseType>(call, metadata);
const request = await call.receiveUnaryMessage();
if (request === undefined || call.cancelled === true) {
if (request === undefined || call.cancelled) {
return;
}
@ -317,7 +317,7 @@ async function handleServerStreaming<RequestType, ResponseType>(
metadata: Metadata): Promise<void> {
const request = await call.receiveUnaryMessage();
if (request === undefined || call.cancelled === true) {
if (request === undefined || call.cancelled) {
return;
}