grpc-js: add sendStatus()

Based on PR feedback, this commit adds a sendStatus() method to
Http2ServerCallStream. All responses will be funnelled through
this method.
This commit is contained in:
cjihrig 2019-05-01 12:38:57 -04:00
parent a6e2edce9a
commit c050bf5ad8
No known key found for this signature in database
GPG Key ID: 7434390BDBE9B9C5
2 changed files with 68 additions and 60 deletions

View File

@ -20,6 +20,7 @@ import * as http2 from 'http2';
import {Duplex, Readable, Writable} from 'stream';
import {ServiceError} from './call';
import {StatusObject} from './call-stream';
import {Status} from './constants';
import {Deserialize, Serialize} from './make-client';
import {Metadata} from './metadata';
@ -170,14 +171,14 @@ export class ServerWritableStreamImpl<RequestType, ResponseType> extends
}
_final(callback: Function): void {
this.call.end();
this.call.sendStatus({code: Status.OK, details: 'OK'} as StatusObject);
callback(null);
}
// tslint:disable-next-line:no-any
end(metadata?: any) {
if (metadata) {
this.call.status.metadata = metadata;
this.call.setMetadata(metadata);
}
super.end();
@ -259,15 +260,17 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
EventEmitter {
cancelled = false;
deadline: NodeJS.Timer|null = null;
status: PartialServiceError = {code: Status.OK, details: 'OK'};
private metadata: Metadata|null = null;
private wantTrailers = false;
constructor(
private stream: http2.ServerHttp2Stream,
private handler: Handler<RequestType, ResponseType>|null) {
super();
this.stream.once('error', (err: Error) => {
this.sendError(err as ServiceError, Status.INTERNAL);
this.stream.once('error', (err: ServiceError) => {
err.code = Status.INTERNAL;
this.sendError(err as ServiceError);
});
this.stream.once('close', () => {
@ -278,6 +281,10 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
});
}
setMetadata(metadata: Metadata): void {
this.metadata = metadata;
}
private get _metadataSent(): boolean {
return this.stream.headersSent;
}
@ -290,22 +297,7 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
// TODO(cjihrig): Include compression headers.
const headers = Object.assign(defaultResponseHeaders, custom);
this.stream.respond(headers, defaultResponseOptions);
this.stream.once('wantTrailers', () => {
let trailersToSend = {
[GRPC_STATUS_HEADER]: this.status.code,
[GRPC_MESSAGE_HEADER]: encodeURI(this.status.details as string)
};
const metadata = this.status.metadata;
if (metadata) {
trailersToSend =
Object.assign(trailersToSend, metadata.toHttp2Headers());
}
this.stream.sendTrailers(trailersToSend);
});
}
receiveMetadata(headers: http2.IncomingHttpHeaders) {
@ -319,8 +311,9 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
if (match === null) {
this.sendError(
new Error('Invalid deadline') as ServiceError, Status.OUT_OF_RANGE);
const err = new Error('Invalid deadline') as ServiceError;
err.code = Status.OUT_OF_RANGE;
this.sendError(err);
return;
}
@ -350,7 +343,8 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
resolve(await this.deserializeMessage(requestBytes));
} catch (err) {
this.sendError(err, Status.INTERNAL);
err.code = Status.INTERNAL;
this.sendError(err);
resolve();
}
});
@ -393,40 +387,64 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
try {
const response = await this.serializeMessage(value as ResponseType);
if (metadata) {
this.status.metadata = metadata;
}
this.end(response);
this.write(response);
this.sendStatus(
{code: Status.OK, details: 'OK', metadata} as StatusObject);
} catch (err) {
this.sendError(err, Status.INTERNAL);
err.code = Status.INTERNAL;
this.sendError(err);
}
}
sendError(error: ServiceError, code = Status.UNKNOWN) {
const {status} = this;
if (error.hasOwnProperty('message')) {
status.details = error.message;
} else {
status.details = 'Unknown Error';
sendStatus(statusObj: StatusObject) {
if (this.cancelled === true) {
return;
}
if (this.deadline !== null) {
clearTimeout(this.deadline);
this.deadline = null;
}
if (!this.wantTrailers) {
this.wantTrailers = true;
this.stream.once('wantTrailers', () => {
let trailersToSend = {
[GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string)
};
const metadata = statusObj.metadata || this.metadata;
if (metadata) {
trailersToSend =
Object.assign(trailersToSend, metadata.toHttp2Headers());
}
this.stream.sendTrailers(trailersToSend);
});
this.sendMetadata();
this.stream.end();
}
}
sendError(error: ServiceError) {
const status: StatusObject = {
code: Status.UNKNOWN,
details: error.hasOwnProperty('message') ? error.message :
'Unknown Error',
metadata: error.hasOwnProperty('metadata') ? error.metadata :
this.metadata as Metadata
};
if (error.hasOwnProperty('code') && Number.isInteger(error.code)) {
status.code = error.code;
if (error.hasOwnProperty('details')) {
status.details = error.details;
}
} else {
status.code = code;
}
if (error.hasOwnProperty('metadata')) {
status.metadata = error.metadata;
}
this.end();
this.sendStatus(status);
}
write(chunk: Buffer) {
@ -437,28 +455,16 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
this.sendMetadata();
return this.stream.write(chunk);
}
end(payload?: Buffer) {
if (this.cancelled === true) {
return;
}
if (this.deadline !== null) {
clearTimeout(this.deadline);
this.deadline = null;
}
this.sendMetadata();
return this.stream.end(payload);
}
}
// tslint:disable:no-any
type UntypedServerCall = Http2ServerCallStream<any, any>;
function handleExpiredDeadline(call: UntypedServerCall) {
call.sendError(
new Error('Deadline exceeded') as ServiceError, Status.DEADLINE_EXCEEDED);
const err = new Error('Deadline exceeded') as ServiceError;
err.code = Status.DEADLINE_EXCEEDED;
call.sendError(err);
call.cancelled = true;
call.emit('cancelled', 'deadline');
}

View File

@ -20,6 +20,7 @@ import {AddressInfo, ListenOptions} from 'net';
import {URL} from 'url';
import {ServiceError} from './call';
import {StatusObject} from './call-stream';
import {Status} from './constants';
import {Deserialize, Serialize, ServiceDefinition} from './make-client';
import {Metadata} from './metadata';
@ -273,7 +274,8 @@ export class Server {
}
} catch (err) {
const call = new Http2ServerCallStream(stream, null);
call.sendError(err, Status.INTERNAL);
err.code = Status.INTERNAL;
call.sendError(err);
}
});
}