Make some filter types synchronous

This commit is contained in:
murgatroid99 2019-10-21 18:04:17 -07:00
parent 97ed462a00
commit caa07ef883
5 changed files with 40 additions and 65 deletions

View File

@ -103,13 +103,6 @@ export class Http2CallStream extends Duplex implements Call {
// Status code mapped from :status. To be used if grpc-status is not received
private mappedStatusCode: Status = Status.UNKNOWN;
// Promise objects that are re-assigned to resolving promises when headers
// or trailers received. Processing headers/trailers is asynchronous, so we
// can use these objects to await their completion. This helps us establish
// order of precedence when obtaining the status of the call.
private handlingHeaders = Promise.resolve();
private handlingTrailers = Promise.resolve();
// This is populated (non-null) if and only if the call has ended
private finalStatus: StatusObject | null = null;
@ -224,30 +217,21 @@ export class Http2CallStream extends Duplex implements Call {
metadata = new Metadata();
}
const status: StatusObject = { code, details, metadata };
this.handlingTrailers = (async () => {
let finalStatus;
try {
// Attempt to assign final status.
finalStatus = await this.filterStack.receiveTrailers(
Promise.resolve(status)
);
} catch (error) {
await this.handlingHeaders;
// This is a no-op if the call was already ended when handling headers.
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata(),
});
return;
}
// It's possible that headers were received but not fully handled yet.
// Give the headers handler an opportunity to end the call first,
// if an error occurred.
await this.handlingHeaders;
let finalStatus;
try {
// Attempt to assign final status.
finalStatus = this.filterStack.receiveTrailers(status);
} catch (error) {
// This is a no-op if the call was already ended when handling headers.
this.endCall(finalStatus);
})();
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata(),
});
return;
}
// This is a no-op if the call was already ended when handling headers.
this.endCall(finalStatus);
}
attachHttp2Stream(stream: http2.ClientHttp2Stream, subchannel: Subchannel): void {
@ -297,19 +281,17 @@ export class Http2CallStream extends Duplex implements Call {
});
return;
}
this.handlingHeaders = this.filterStack
.receiveMetadata(Promise.resolve(metadata))
.then(finalMetadata => {
this.emit('metadata', finalMetadata);
})
.catch(error => {
this.destroyHttp2Stream();
this.endCall({
code: Status.UNKNOWN,
details: error.message,
metadata: new Metadata(),
});
try {
const finalMetadata = this.filterStack.receiveMetadata(metadata);
this.emit('metadata', finalMetadata);
} catch (error) {
this.destroyHttp2Stream();
this.endCall({
code: Status.UNKNOWN,
details: error.message,
metadata: new Metadata(),
});
}
}
});
stream.on('trailers', this.handleTrailers.bind(this));
@ -346,9 +328,6 @@ export class Http2CallStream extends Duplex implements Call {
default:
code = Status.INTERNAL;
}
// This guarantees that if trailers were received, the value of the
// 'grpc-status' header takes precedence for emitted status data.
await this.handlingTrailers;
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
@ -392,9 +371,6 @@ export class Http2CallStream extends Duplex implements Call {
cancelWithStatus(status: Status, details: string): void {
this.destroyHttp2Stream();
(async () => {
// If trailers are currently being processed, the call should be ended
// by handleTrailers instead.
await this.handlingTrailers;
this.endCall({ code: status, details, metadata: new Metadata() });
})();
}

View File

@ -176,18 +176,17 @@ export class CompressionFilter extends BaseFilter implements Filter {
return headers;
}
async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
const headers: Metadata = await metadata;
const receiveEncoding: MetadataValue[] = headers.get('grpc-encoding');
receiveMetadata(metadata: Metadata): Metadata {
const receiveEncoding: MetadataValue[] = metadata.get('grpc-encoding');
if (receiveEncoding.length > 0) {
const encoding: MetadataValue = receiveEncoding[0];
if (typeof encoding === 'string') {
this.receiveCompression = getCompressionHandler(encoding);
}
}
headers.remove('grpc-encoding');
headers.remove('grpc-accept-encoding');
return headers;
metadata.remove('grpc-encoding');
metadata.remove('grpc-accept-encoding');
return metadata;
}
async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {

View File

@ -32,8 +32,8 @@ export class FilterStack implements Filter {
return result;
}
receiveMetadata(metadata: Promise<Metadata>) {
let result: Promise<Metadata> = metadata;
receiveMetadata(metadata: Metadata) {
let result: Metadata = metadata;
for (let i = this.filters.length - 1; i >= 0; i--) {
result = this.filters[i].receiveMetadata(result);
@ -62,8 +62,8 @@ export class FilterStack implements Filter {
return result;
}
receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
let result: Promise<StatusObject> = status;
receiveTrailers(status: StatusObject): StatusObject {
let result: StatusObject = status;
for (let i = this.filters.length - 1; i >= 0; i--) {
result = this.filters[i].receiveTrailers(result);

View File

@ -25,21 +25,21 @@ import { Metadata } from './metadata';
export interface Filter {
sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
receiveMetadata(metadata: Metadata): Metadata;
sendMessage(message: Promise<WriteObject>): Promise<WriteObject>;
receiveMessage(message: Promise<Buffer>): Promise<Buffer>;
receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject>;
receiveTrailers(status: StatusObject): StatusObject;
}
export abstract class BaseFilter {
export abstract class BaseFilter implements Filter {
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
return metadata;
}
async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
receiveMetadata(metadata: Metadata): Metadata {
return metadata;
}
@ -51,7 +51,7 @@ export abstract class BaseFilter {
return message;
}
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
receiveTrailers(status: StatusObject): StatusObject {
return status;
}
}

View File

@ -22,9 +22,9 @@ import { Status } from './constants';
import { BaseFilter, Filter, FilterFactory } from './filter';
export class MetadataStatusFilter extends BaseFilter implements Filter {
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
receiveTrailers(status: StatusObject): StatusObject {
// tslint:disable-next-line:prefer-const
let { code, details, metadata } = await status;
let { code, details, metadata } = status;
if (code !== Status.UNKNOWN) {
// we already have a known status, so don't assign a new one.
return { code, details, metadata };