Rework call stream API to work better with interceptor APIs

This commit is contained in:
murgatroid99 2019-10-23 16:28:00 -07:00
parent caa07ef883
commit 3144cb6ada
4 changed files with 244 additions and 165 deletions

View File

@ -16,15 +16,12 @@
*/
import * as http2 from 'http2';
import { Duplex } from 'stream';
import { CallCredentials } from './call-credentials';
import { Status } from './constants';
import { EmitterAugmentation1 } from './events';
import { Filter } from './filter';
import { FilterStackFactory } from './filter-stack';
import { Metadata } from './metadata';
import { ObjectDuplex, WriteCallback } from './object-stream';
import { StreamDecoder } from './stream-decoder';
import { ChannelImplementation } from './channel';
import { Subchannel } from './subchannel';
@ -63,39 +60,84 @@ export interface WriteObject {
flags?: number;
}
/**
* This interface represents a duplex stream associated with a single gRPC call.
*/
export interface MetadataListener {
(metadata: Metadata, next: (metadata: Metadata) => void): void;
}
export interface MessageListener {
(message: any, next: (message: any) => void): void;
}
export interface StatusListener {
(status: StatusObject, next: (status: StatusObject) => void): void;
}
export interface FullListener {
onReceiveMetadata: MetadataListener;
onReceiveMessage: MessageListener;
onReceiveStatus: StatusListener;
}
export type Listener = Partial<FullListener>;
export interface InterceptingListener {
onReceiveMetadata(metadata: Metadata): void;
onReceiveMessage(message: any): void;
onReceiveStatus(status: StatusObject): void;
}
class InterceptingListenerImpl implements InterceptingListener {
constructor(private listener: FullListener, private nextListener: InterceptingListener) {}
onReceiveMetadata(metadata: Metadata): void {
const next = this.nextListener.onReceiveMetadata.bind(this.nextListener);
this.listener.onReceiveMetadata(metadata, next);
}
onReceiveMessage(message: any): void {
const next = this.nextListener.onReceiveMessage.bind(this.nextListener);
this.listener.onReceiveMessage(message, next);
}
onReceiveStatus(status: StatusObject): void {
const next = this.nextListener.onReceiveStatus.bind(this.nextListener);
this.listener.onReceiveStatus(status, next);
}
}
export interface WriteCallback {
(error?: Error | null): void;
}
export type Call = {
cancelWithStatus(status: Status, details: string): void;
getPeer(): string;
sendMetadata(metadata: Metadata): void;
start(metadata: Metadata, listener: InterceptingListener): void;
write(writeObj: WriteObject, callback: WriteCallback): void;
startRead(): void;
halfClose(): void;
getDeadline(): Deadline;
getCredentials(): CallCredentials;
setCredentials(credentials: CallCredentials): void;
/* If the return value is null, the call has not ended yet. Otherwise, it has
* ended with the specified status */
getStatus(): StatusObject | null;
getMethod(): string;
getHost(): string;
} & EmitterAugmentation1<'metadata', Metadata> &
EmitterAugmentation1<'status', StatusObject> &
ObjectDuplex<WriteObject, Buffer>;
}
export class Http2CallStream extends Duplex implements Call {
export class Http2CallStream implements Call {
credentials: CallCredentials;
filterStack: Filter;
private http2Stream: http2.ClientHttp2Stream | null = null;
private pendingRead = false;
private pendingWrite: Buffer | null = null;
private pendingWriteCallback: WriteCallback | null = null;
private pendingFinalCallback: Function | null = null;
private writesClosed = false;
private decoder = new StreamDecoder();
private isReadFilterPending = false;
private canPush = false;
private readsClosed = false;
private statusOutput = false;
private unpushedReadMessages: Array<Buffer | null> = [];
private unfilteredReadMessages: Array<Buffer | null> = [];
@ -109,6 +151,8 @@ export class Http2CallStream extends Duplex implements Call {
private subchannel: Subchannel | null = null;
private disconnectListener: () => void;
private listener: InterceptingListener | null = null;
constructor(
private readonly methodName: string,
private readonly channel: ChannelImplementation,
@ -116,7 +160,6 @@ export class Http2CallStream extends Duplex implements Call {
filterStackFactory: FilterStackFactory,
private readonly channelCallCredentials: CallCredentials
) {
super({ objectMode: true });
this.filterStack = filterStackFactory.createFilter(this);
this.credentials = channelCallCredentials;
this.disconnectListener = () => {
@ -124,14 +167,10 @@ export class Http2CallStream extends Duplex implements Call {
};
}
/**
* On first call, emits a 'status' event with the given StatusObject.
* Subsequent calls are no-ops.
* @param status The status of the call.
*/
private endCall(status: StatusObject): void {
if (this.finalStatus === null) {
this.finalStatus = status;
private outputStatus() {
/* Precondition: this.finalStatus !== null */
if (!this.statusOutput) {
this.statusOutput = true;
/* We do this asynchronously to ensure that no async function is in the
* call stack when we return control to the application. If an async
* function is in the call stack, any exception thrown by the application
@ -140,7 +179,7 @@ export class Http2CallStream extends Duplex implements Call {
* a warning, the error will be effectively swallowed and execution will
* continue */
process.nextTick(() => {
this.emit('status', status);
this.listener!.onReceiveStatus(this.finalStatus!);
});
if (this.subchannel) {
this.subchannel.callUnref();
@ -149,6 +188,35 @@ export class Http2CallStream extends Duplex implements Call {
}
}
/**
* On first call, emits a 'status' event with the given StatusObject.
* Subsequent calls are no-ops.
* @param status The status of the call.
*/
private endCall(status: StatusObject): void {
/* If the status is OK and a new status comes in (e.g. from a
* deserialization failure), that new status takes priority */
if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
this.finalStatus = status;
/* Then, if an incoming message is still being handled or the status code
* is OK, hold off on emitting the status until that is done */
if (this.readsClosed || this.finalStatus.code !== Status.OK) {
this.outputStatus();
}
}
}
private push(message: Buffer | null): void {
if (message === null) {
this.readsClosed = true;
if (this.finalStatus) {
this.outputStatus();
}
} else {
this.listener!.onReceiveMessage(message);
}
}
private handleFilterError(error: Error) {
this.cancelWithStatus(Status.INTERNAL, error.message);
}
@ -157,14 +225,14 @@ export class Http2CallStream extends Duplex implements Call {
/* If we the call has already ended, we don't want to do anything with
* this message. Dropping it on the floor is correct behavior */
if (this.finalStatus !== null) {
this.push(null);
return;
}
this.isReadFilterPending = false;
if (this.canPush) {
if (!this.push(message)) {
this.canPush = false;
(this.http2Stream as http2.ClientHttp2Stream).pause();
}
this.push(message)
this.canPush = false;
this.http2Stream!.pause();
} else {
this.unpushedReadMessages.push(message);
}
@ -180,6 +248,7 @@ export class Http2CallStream extends Duplex implements Call {
/* If we the call has already ended, we don't want to do anything with
* this message. Dropping it on the floor is correct behavior */
if (this.finalStatus !== null) {
this.push(null);
return;
}
if (framedMessage === null) {
@ -283,7 +352,7 @@ export class Http2CallStream extends Duplex implements Call {
}
try {
const finalMetadata = this.filterStack.receiveMetadata(metadata);
this.emit('metadata', finalMetadata);
this.listener!.onReceiveMetadata(finalMetadata);
} catch (error) {
this.destroyHttp2Stream();
this.endCall({
@ -348,13 +417,14 @@ export class Http2CallStream extends Duplex implements Call {
}
stream.write(this.pendingWrite, this.pendingWriteCallback);
}
if (this.pendingFinalCallback) {
stream.end(this.pendingFinalCallback);
if (this.writesClosed) {
stream.end();
}
}
}
sendMetadata(metadata: Metadata): void {
start(metadata: Metadata, listener: InterceptingListener) {
this.listener = listener;
this.channel._startCallStream(this, metadata);
}
@ -370,9 +440,7 @@ export class Http2CallStream extends Duplex implements Call {
cancelWithStatus(status: Status, details: string): void {
this.destroyHttp2Stream();
(async () => {
this.endCall({ code: status, details, metadata: new Metadata() });
})();
this.endCall({ code: status, details, metadata: new Metadata() });
}
getDeadline(): Deadline {
@ -403,7 +471,7 @@ export class Http2CallStream extends Duplex implements Call {
return this.options.host;
}
_read(size: number) {
startRead() {
/* If we have already emitted a status, we should not emit any more
* messages and we should communicate that the stream has ended */
if (this.finalStatus !== null) {
@ -414,13 +482,11 @@ export class Http2CallStream extends Duplex implements Call {
if (this.http2Stream === null) {
this.pendingRead = true;
} else {
while (this.unpushedReadMessages.length > 0) {
const nextMessage = this.unpushedReadMessages.shift();
this.canPush = this.push(nextMessage);
if (nextMessage === null || !this.canPush) {
this.canPush = false;
return;
}
if (this.unpushedReadMessages.length > 0) {
const nextMessage: Buffer | null = this.unpushedReadMessages.shift() as Buffer | null;
this.push(nextMessage);
this.canPush = false;
return;
}
/* Only resume reading from the http2Stream if we don't have any pending
* messages to emit, and we haven't gotten the signal to stop pushing
@ -429,8 +495,8 @@ export class Http2CallStream extends Duplex implements Call {
}
}
_write(chunk: WriteObject, encoding: string, cb: WriteCallback) {
this.filterStack.sendMessage(Promise.resolve(chunk)).then(message => {
write(writeObj: WriteObject, cb: WriteCallback) {
this.filterStack.sendMessage(Promise.resolve(writeObj)).then(message => {
if (this.http2Stream === null) {
this.pendingWrite = message.message;
this.pendingWriteCallback = cb;
@ -440,11 +506,10 @@ export class Http2CallStream extends Duplex implements Call {
}, this.handleFilterError.bind(this));
}
_final(cb: Function) {
if (this.http2Stream === null) {
this.pendingFinalCallback = cb;
} else {
this.http2Stream.end(cb);
halfClose() {
this.writesClosed = true;
if (this.http2Stream !== null) {
this.http2Stream.end();
}
}
}
}

View File

@ -22,7 +22,7 @@ import { Call, StatusObject, WriteObject } from './call-stream';
import { Status } from './constants';
import { EmitterAugmentation1 } from './events';
import { Metadata } from './metadata';
import { ObjectReadable, ObjectWritable } from './object-stream';
import { ObjectReadable, ObjectWritable, WriteCallback } from './object-stream';
/**
* A type extending the built-in Error object with additional fields.
@ -86,12 +86,6 @@ export class ClientUnaryCallImpl extends EventEmitter
implements ClientUnaryCall {
constructor(private readonly call: Call) {
super();
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
call.on('status', (status: StatusObject) => {
this.emit('status', status);
});
}
cancel(): void {
@ -103,43 +97,6 @@ export class ClientUnaryCallImpl extends EventEmitter
}
}
function setUpReadableStream<ResponseType>(
stream: ClientReadableStream<ResponseType>,
call: Call,
deserialize: (chunk: Buffer) => ResponseType
): void {
let statusEmitted = false;
call.on('data', (data: Buffer) => {
let deserialized: ResponseType;
try {
deserialized = deserialize(data);
} catch (e) {
call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response');
return;
}
if (!stream.push(deserialized)) {
call.pause();
}
});
call.on('end', () => {
if (statusEmitted) {
stream.push(null);
} else {
call.once('status', () => {
stream.push(null);
});
}
});
call.on('status', (status: StatusObject) => {
if (status.code !== Status.OK) {
stream.emit('error', callErrorFromStatus(status));
}
stream.emit('status', status);
statusEmitted = true;
});
call.pause();
}
export class ClientReadableStreamImpl<ResponseType> extends Readable
implements ClientReadableStream<ResponseType> {
constructor(
@ -147,10 +104,6 @@ export class ClientReadableStreamImpl<ResponseType> extends Readable
readonly deserialize: (chunk: Buffer) => ResponseType
) {
super({ objectMode: true });
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
setUpReadableStream<ResponseType>(this, call, deserialize);
}
cancel(): void {
@ -162,7 +115,7 @@ export class ClientReadableStreamImpl<ResponseType> extends Readable
}
_read(_size: number): void {
this.call.resume();
this.call.startRead();
}
}
@ -171,7 +124,7 @@ function tryWrite<RequestType>(
serialize: (value: RequestType) => Buffer,
chunk: RequestType,
encoding: string,
cb: Function
cb: WriteCallback
) {
let message: Buffer;
const flags: number = Number(encoding);
@ -196,12 +149,6 @@ export class ClientWritableStreamImpl<RequestType> extends Writable
readonly serialize: (value: RequestType) => Buffer
) {
super({ objectMode: true });
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
call.on('status', (status: StatusObject) => {
this.emit('status', status);
});
}
cancel(): void {
@ -212,12 +159,12 @@ export class ClientWritableStreamImpl<RequestType> extends Writable
return this.call.getPeer();
}
_write(chunk: RequestType, encoding: string, cb: Function) {
_write(chunk: RequestType, encoding: string, cb: WriteCallback) {
tryWrite<RequestType>(this.call, this.serialize, chunk, encoding, cb);
}
_final(cb: Function) {
this.call.end();
this.call.halfClose();
cb();
}
}
@ -230,10 +177,6 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
readonly deserialize: (chunk: Buffer) => ResponseType
) {
super({ objectMode: true });
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
});
setUpReadableStream<ResponseType>(this, call, deserialize);
}
cancel(): void {
@ -245,15 +188,15 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
}
_read(_size: number): void {
this.call.resume();
this.call.startRead();
}
_write(chunk: RequestType, encoding: string, cb: Function) {
_write(chunk: RequestType, encoding: string, cb: WriteCallback) {
tryWrite<RequestType>(this.call, this.serialize, chunk, encoding, cb);
}
_final(cb: Function) {
this.call.end();
this.call.halfClose();
cb();
}
}

View File

@ -26,9 +26,10 @@ import {
ClientWritableStreamImpl,
ServiceError,
callErrorFromStatus,
SurfaceCall,
} from './call';
import { CallCredentials } from './call-credentials';
import { Call, Deadline, StatusObject, WriteObject } from './call-stream';
import { Call, Deadline, StatusObject, WriteObject, InterceptingListener } from './call-stream';
import { Channel, ConnectivityState, ChannelImplementation } from './channel';
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
@ -125,38 +126,6 @@ export class Client {
setImmediate(checkState);
}
private handleUnaryResponse<ResponseType>(
call: Call,
deserialize: (value: Buffer) => ResponseType,
callback: UnaryCallback<ResponseType>
): void {
let responseMessage: ResponseType | null = null;
call.on('data', (data: Buffer) => {
if (responseMessage != null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
}
try {
responseMessage = deserialize(data);
} catch (e) {
call.cancelWithStatus(
Status.INTERNAL,
'Failed to parse server response'
);
}
});
call.on('status', (status: StatusObject) => {
/* We assume that call emits status after it emits end, and that it
* accounts for any cancelWithStatus calls up until it emits status.
* Therefore, considering the above event handlers, status.code should be
* OK if and only if we have a non-null responseMessage */
if (status.code === Status.OK) {
callback(null, responseMessage as ResponseType);
} else {
callback(callErrorFromStatus(status));
}
});
}
private checkOptionalUnaryResponseArguments<ResponseType>(
arg1: Metadata | CallOptions | UnaryCallback<ResponseType>,
arg2?: CallOptions | UnaryCallback<ResponseType>,
@ -244,11 +213,38 @@ export class Client {
}
const message: Buffer = serialize(argument);
const writeObj: WriteObject = { message };
call.sendMetadata(metadata);
call.write(writeObj);
call.end();
this.handleUnaryResponse<ResponseType>(call, deserialize, callback);
return new ClientUnaryCallImpl(call);
const emitter = new ClientUnaryCallImpl(call);
let responseMessage: ResponseType | null = null;
call.start(metadata, {
onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata);
},
onReceiveMessage(message: Buffer) {
if (responseMessage != null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
}
try {
responseMessage = deserialize(message);
} catch (e) {
call.cancelWithStatus(
Status.INTERNAL,
'Failed to parse server response'
);
}
call.startRead();
},
onReceiveStatus(status: StatusObject) {
if (status.code === Status.OK) {
callback!(null, responseMessage!);
} else {
callback!(callErrorFromStatus(status));
}
emitter.emit('status', status);
}
});
call.write(writeObj, () => {call.halfClose();});
call.startRead();
return emitter;
}
makeClientStreamRequest<RequestType, ResponseType>(
@ -300,9 +296,37 @@ export class Client {
if (options.credentials) {
call.setCredentials(options.credentials);
}
call.sendMetadata(metadata);
this.handleUnaryResponse<ResponseType>(call, deserialize, callback);
return new ClientWritableStreamImpl<RequestType>(call, serialize);
const emitter = new ClientWritableStreamImpl<RequestType>(call, serialize);
let responseMessage: ResponseType | null = null;
call.start(metadata, {
onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata);
},
onReceiveMessage(message: Buffer) {
if (responseMessage != null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
}
try {
responseMessage = deserialize(message);
} catch (e) {
call.cancelWithStatus(
Status.INTERNAL,
'Failed to parse server response'
);
}
call.startRead();
},
onReceiveStatus(status: StatusObject) {
if (status.code === Status.OK) {
callback!(null, responseMessage!);
} else {
callback!(callErrorFromStatus(status));
}
emitter.emit('status', status);
}
});
call.startRead();
return emitter;
}
private checkMetadataAndOptions(
@ -365,10 +389,33 @@ export class Client {
}
const message: Buffer = serialize(argument);
const writeObj: WriteObject = { message };
call.sendMetadata(metadata);
call.write(writeObj);
call.end();
return new ClientReadableStreamImpl<ResponseType>(call, deserialize);
const stream = new ClientReadableStreamImpl<ResponseType>(call, deserialize);
call.start(metadata, {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
},
onReceiveMessage(message: Buffer) {
let deserialized: ResponseType;
try {
deserialized = deserialize(message);
} catch (e) {
call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response');
return;
}
if (stream.push(deserialized)) {
call.startRead();
}
},
onReceiveStatus(status: StatusObject) {
stream.push(null);
if (status.code !== Status.OK) {
stream.emit('error', callErrorFromStatus(status));
}
stream.emit('status', status);
}
});
call.write(writeObj, () => {call.halfClose();});
return stream;
}
makeBidiStreamRequest<RequestType, ResponseType>(
@ -402,11 +449,35 @@ export class Client {
if (options.credentials) {
call.setCredentials(options.credentials);
}
call.sendMetadata(metadata);
return new ClientDuplexStreamImpl<RequestType, ResponseType>(
const stream = new ClientDuplexStreamImpl<RequestType, ResponseType>(
call,
serialize,
deserialize
);
call.start(metadata, {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
},
onReceiveMessage(message: Buffer) {
let deserialized: ResponseType;
try {
deserialized = deserialize(message);
} catch (e) {
call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response');
return;
}
if (stream.push(deserialized)) {
call.startRead();
}
},
onReceiveStatus(status: StatusObject) {
stream.push(null);
if (status.code !== Status.OK) {
stream.emit('error', callErrorFromStatus(status));
}
stream.emit('status', status);
}
});
return stream;
}
}

View File

@ -66,7 +66,7 @@ export class DeadlineFilter extends BaseFilter implements Filter {
'Deadline exceeded'
);
}, timeout);
callStream.on('status', () => clearTimeout(this.timer as NodeJS.Timer));
this.timer.unref();
}
}