From 33875dce4a476b736626a937ee6e7f78ec5e34f1 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 14 Nov 2019 15:02:24 -0800 Subject: [PATCH] grpc-js: make client interceptors tests pass mostly unmodified --- packages/grpc-js/src/call-stream.ts | 107 +- packages/grpc-js/src/call.ts | 50 +- packages/grpc-js/src/client-interceptors.ts | 57 +- packages/grpc-js/src/client.ts | 44 +- packages/grpc-js/src/server-call.ts | 36 +- packages/grpc-js/src/server.ts | 1 - test/api/client_interceptors_test.js | 3289 ++++++++++--------- 7 files changed, 1880 insertions(+), 1704 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index e3ffb5eb..7cd6d68f 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -152,6 +152,7 @@ export class Http2CallStream implements Call { filterStack: Filter; private http2Stream: http2.ClientHttp2Stream | null = null; private pendingRead = false; + private isWriteFilterPending = false; private pendingWrite: Buffer | null = null; private pendingWriteCallback: WriteCallback | null = null; private writesClosed = false; @@ -160,12 +161,16 @@ export class Http2CallStream implements Call { private isReadFilterPending = false; private canPush = false; + /** + * Indicates that an 'end' event has come from the http2 stream, so there + * will be no more data events. + */ private readsClosed = false; private statusOutput = false; - private unpushedReadMessages: Array = []; - private unfilteredReadMessages: Array = []; + private unpushedReadMessages: Buffer[] = []; + private unfilteredReadMessages: Buffer[] = []; // Status code mapped from :status. To be used if grpc-status is not received private mappedStatusCode: Status = Status.UNKNOWN; @@ -200,16 +205,7 @@ export class Http2CallStream implements Call { /* Precondition: this.finalStatus !== null */ if (!this.statusOutput) { this.statusOutput = true; - /* We do this asynchronously to ensure that no async function is in the - * call stack when we return control to the application. If an async - * function is in the call stack, any exception thrown by the application - * (or our tests) will bubble up and turn into promise rejection, which - * will result in an UnhandledPromiseRejectionWarning. Because that is - * a warning, the error will be effectively swallowed and execution will - * continue */ - process.nextTick(() => { - this.listener!.onReceiveStatus(this.finalStatus!); - }); + this.listener!.onReceiveStatus(this.finalStatus!); if (this.subchannel) { this.subchannel.callUnref(); this.subchannel.removeDisconnectListener(this.disconnectListener); @@ -227,30 +223,24 @@ export class Http2CallStream implements Call { * deserialization failure), that new status takes priority */ if (this.finalStatus === null || this.finalStatus.code === Status.OK) { this.finalStatus = status; - /* Then, if an incoming message is still being handled or the status code - * is OK, hold off on emitting the status until that is done */ - if (this.readsClosed || this.finalStatus.code !== Status.OK) { + this.maybeOutputStatus(); + } + } + + private maybeOutputStatus() { + if (this.finalStatus !== null) { + /* The combination check of readsClosed and that the two message buffer + * arrays are empty checks that there all incoming data has been fully + * processed */ + if (this.finalStatus.code !== Status.OK || (this.readsClosed && this.unpushedReadMessages.length === 0 && this.unfilteredReadMessages.length === 0 && !this.isReadFilterPending)) { this.outputStatus(); } } } - private push(message: Buffer | null): void { - if (message === null) { - this.readsClosed = true; - if (this.finalStatus) { - this.outputStatus(); - } - } else { - this.listener!.onReceiveMessage(message); - /* Don't wait for the upper layer to ask for a read before pushing null - * to close out the call, because pushing null doesn't actually push - * another message up to the upper layer */ - if (this.unpushedReadMessages.length > 0 && this.unpushedReadMessages[0] === null) { - this.unpushedReadMessages.shift(); - this.push(null); - } - } + private push(message: Buffer): void { + this.listener!.onReceiveMessage(message); + this.maybeOutputStatus(); } private handleFilterError(error: Error) { @@ -261,7 +251,7 @@ export class Http2CallStream implements Call { /* If we the call has already ended, we don't want to do anything with * this message. Dropping it on the floor is correct behavior */ if (this.finalStatus !== null) { - this.push(null); + this.maybeOutputStatus(); return; } this.isReadFilterPending = false; @@ -275,24 +265,16 @@ export class Http2CallStream implements Call { if (this.unfilteredReadMessages.length > 0) { /* nextMessage is guaranteed not to be undefined because unfilteredReadMessages is non-empty */ - const nextMessage = this.unfilteredReadMessages.shift() as Buffer | null; + const nextMessage = this.unfilteredReadMessages.shift()!; this.filterReceivedMessage(nextMessage); } } - private filterReceivedMessage(framedMessage: Buffer | null) { + private filterReceivedMessage(framedMessage: Buffer) { /* If we the call has already ended, we don't want to do anything with * this message. Dropping it on the floor is correct behavior */ if (this.finalStatus !== null) { - this.push(null); - return; - } - if (framedMessage === null) { - if (this.canPush) { - this.push(null); - } else { - this.unpushedReadMessages.push(null); - } + this.maybeOutputStatus(); return; } this.isReadFilterPending = true; @@ -304,7 +286,7 @@ export class Http2CallStream implements Call { ); } - private tryPush(messageBytes: Buffer | null): void { + private tryPush(messageBytes: Buffer): void { if (this.isReadFilterPending) { this.unfilteredReadMessages.push(messageBytes); } else { @@ -411,12 +393,23 @@ export class Http2CallStream implements Call { } }); stream.on('end', () => { - this.tryPush(null); + this.readsClosed = true; + this.maybeOutputStatus(); }); - stream.on('close', async () => { + stream.on('close', () => { let code: Status; let details = ''; switch (stream.rstCode) { + case http2.constants.NGHTTP2_NO_ERROR: + /* If we get a NO_ERROR code and we already have a status, the + * stream completed properly and we just haven't fully processed + * it yet */ + if (this.finalStatus !== null) { + return; + } + code = Status.INTERNAL; + details = `Received RST_STREAM with code ${stream.rstCode}`; + break; case http2.constants.NGHTTP2_REFUSED_STREAM: code = Status.UNAVAILABLE; details = 'Stream refused by server'; @@ -435,6 +428,7 @@ export class Http2CallStream implements Call { break; default: code = Status.INTERNAL; + details = `Received RST_STREAM with code ${stream.rstCode}`; } // This is a no-op if trailers were received at all. // This is OK, because status codes emitted here correspond to more @@ -456,9 +450,7 @@ export class Http2CallStream implements Call { } stream.write(this.pendingWrite, this.pendingWriteCallback); } - if (this.writesClosed) { - stream.end(); - } + this.maybeCloseWrites(); } } @@ -514,7 +506,7 @@ export class Http2CallStream implements Call { /* If we have already emitted a status, we should not emit any more * messages and we should communicate that the stream has ended */ if (this.finalStatus !== null) { - this.push(null); + this.maybeOutputStatus(); return; } this.canPush = true; @@ -522,7 +514,7 @@ export class Http2CallStream implements Call { this.pendingRead = true; } else { if (this.unpushedReadMessages.length > 0) { - const nextMessage: Buffer | null = this.unpushedReadMessages.shift() as Buffer | null; + const nextMessage: Buffer = this.unpushedReadMessages.shift()!; this.push(nextMessage); this.canPush = false; return; @@ -534,26 +526,33 @@ export class Http2CallStream implements Call { } } + private maybeCloseWrites() { + if (this.writesClosed && !this.isWriteFilterPending && this.http2Stream !== null) { + this.http2Stream.end(); + } + } + sendMessageWithContext(context: MessageContext, message: Buffer) { const writeObj: WriteObject = { message: message, flags: context.flags }; const cb: WriteCallback = context.callback || (() => {}); + this.isWriteFilterPending = true; this.filterStack.sendMessage(Promise.resolve(writeObj)).then(message => { + this.isWriteFilterPending = false; if (this.http2Stream === null) { this.pendingWrite = message.message; this.pendingWriteCallback = cb; } else { this.http2Stream.write(message.message, cb); + this.maybeCloseWrites(); } }, this.handleFilterError.bind(this)); } halfClose() { this.writesClosed = true; - if (this.http2Stream !== null) { - this.http2Stream.end(); - } + this.maybeCloseWrites(); } } \ No newline at end of file diff --git a/packages/grpc-js/src/call.ts b/packages/grpc-js/src/call.ts index c5f87a00..42d0886f 100644 --- a/packages/grpc-js/src/call.ts +++ b/packages/grpc-js/src/call.ts @@ -18,11 +18,12 @@ import { EventEmitter } from 'events'; import { Duplex, Readable, Writable } from 'stream'; -import { Call, StatusObject, WriteObject } from './call-stream'; +import { StatusObject, MessageContext } from './call-stream'; import { Status } from './constants'; import { EmitterAugmentation1 } from './events'; import { Metadata } from './metadata'; import { ObjectReadable, ObjectWritable, WriteCallback } from './object-stream'; +import { InterceptingCallInterface } from './client-interceptors'; /** * A type extending the built-in Error object with additional fields. @@ -81,7 +82,7 @@ export function callErrorFromStatus(status: StatusObject): ServiceError { export class ClientUnaryCallImpl extends EventEmitter implements ClientUnaryCall { - constructor(private readonly call: Call) { + constructor(private readonly call: InterceptingCallInterface) { super(); } @@ -97,7 +98,7 @@ export class ClientUnaryCallImpl extends EventEmitter export class ClientReadableStreamImpl extends Readable implements ClientReadableStream { constructor( - private readonly call: Call, + private readonly call: InterceptingCallInterface, readonly deserialize: (chunk: Buffer) => ResponseType ) { super({ objectMode: true }); @@ -116,33 +117,10 @@ export class ClientReadableStreamImpl extends Readable } } -function tryWrite( - call: Call, - serialize: (value: RequestType) => Buffer, - chunk: RequestType, - encoding: string, - cb: WriteCallback -) { - let message: Buffer; - const flags: number = Number(encoding); - try { - message = serialize(chunk); - } catch (e) { - call.cancelWithStatus(Status.INTERNAL, 'Serialization failure'); - cb(e); - return; - } - const writeObj: WriteObject = { message }; - if (!Number.isNaN(flags)) { - writeObj.flags = flags; - } - call.write(writeObj, cb); -} - export class ClientWritableStreamImpl extends Writable implements ClientWritableStream { constructor( - private readonly call: Call, + private readonly call: InterceptingCallInterface, readonly serialize: (value: RequestType) => Buffer ) { super({ objectMode: true }); @@ -157,12 +135,14 @@ export class ClientWritableStreamImpl extends Writable } _write(chunk: RequestType, encoding: string, cb: WriteCallback) { - const writeObj: WriteObject = { message: chunk }; + const context: MessageContext = { + callback: cb + } const flags: number = Number(encoding); if (!Number.isNaN(flags)) { - writeObj.flags = flags; + context.flags = flags; } - this.call.write(writeObj, cb); + this.call.sendMessageWithContext(context, chunk); } _final(cb: Function) { @@ -174,7 +154,7 @@ export class ClientWritableStreamImpl extends Writable export class ClientDuplexStreamImpl extends Duplex implements ClientDuplexStream { constructor( - private readonly call: Call, + private readonly call: InterceptingCallInterface, readonly serialize: (value: RequestType) => Buffer, readonly deserialize: (chunk: Buffer) => ResponseType ) { @@ -194,12 +174,14 @@ export class ClientDuplexStreamImpl extends Duplex } _write(chunk: RequestType, encoding: string, cb: WriteCallback) { - const writeObj: WriteObject = { message: chunk }; + const context: MessageContext = { + callback: cb + } const flags: number = Number(encoding); if (!Number.isNaN(flags)) { - writeObj.flags = flags; + context.flags = flags; } - this.call.write(writeObj, cb); + this.call.sendMessageWithContext(context, chunk); } _final(cb: Function) { diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts index 9703db49..bf93f9be 100644 --- a/packages/grpc-js/src/client-interceptors.ts +++ b/packages/grpc-js/src/client-interceptors.ts @@ -155,8 +155,9 @@ export interface InterceptorOptions extends CallOptions { export interface InterceptingCallInterface { cancelWithStatus(status: Status, details: string): void; getPeer(): string; - start(metadata: Metadata, listener: InterceptingListener): void; + start(metadata: Metadata, listener?: Partial): void; sendMessageWithContext(context: MessageContext, message: any): void; + sendMessage(message: any): void; startRead(): void; halfClose(): void; @@ -194,18 +195,23 @@ export class InterceptingCall implements InterceptingCallInterface { getPeer() { return this.nextCall.getPeer(); } - start(metadata: Metadata, interceptingListener: InterceptingListener): void { - this.requester.start(metadata, interceptingListener, (md, listener) => { + start(metadata: Metadata, interceptingListener?: Partial): void { + const fullInterceptingListener: InterceptingListener = { + onReceiveMetadata: interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ?? (metadata => {}), + onReceiveMessage: interceptingListener?.onReceiveMessage?.bind(interceptingListener) ?? (message => {}), + onReceiveStatus: interceptingListener?.onReceiveStatus?.bind(interceptingListener) ?? (status => {}) + } + this.requester.start(metadata, fullInterceptingListener, (md, listener) => { let finalInterceptingListener: InterceptingListener; if (isInterceptingListener(listener)) { finalInterceptingListener = listener; } else { const fullListener: FullListener = { - onReceiveMetadata: listener.onReceiveMetadata || defaultListener.onReceiveMetadata, - onReceiveMessage: listener.onReceiveMessage || defaultListener.onReceiveMessage, - onReceiveStatus: listener.onReceiveStatus || defaultListener.onReceiveStatus + onReceiveMetadata: listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata, + onReceiveMessage: listener.onReceiveMessage ?? defaultListener.onReceiveMessage, + onReceiveStatus: listener.onReceiveStatus ?? defaultListener.onReceiveStatus }; - finalInterceptingListener = new InterceptingListenerImpl(fullListener, interceptingListener); + finalInterceptingListener = new InterceptingListenerImpl(fullListener, fullInterceptingListener); } this.nextCall.start(md, finalInterceptingListener); }); @@ -218,7 +224,7 @@ export class InterceptingCall implements InterceptingCallInterface { if (this.pendingHalfClose) { this.nextCall.halfClose(); } - }) + }); } sendMessage(message: any): void { this.sendMessageWithContext({}, message); @@ -308,17 +314,20 @@ class BaseInterceptingCall implements InterceptingCallInterface { this.call.cancelWithStatus(Status.INTERNAL, 'Serialization failure'); } } - start(metadata: Metadata, listener: InterceptingListener): void { + sendMessage(message: any) { + this.sendMessageWithContext({}, message); + } + start(metadata: Metadata, interceptingListener?: Partial): void { let readError: StatusObject | null = null; this.call.start(metadata, { onReceiveMetadata: (metadata) => { - listener.onReceiveMetadata(metadata); + interceptingListener?.onReceiveMetadata?.(metadata); }, onReceiveMessage: (message) => { let deserialized: any; try { deserialized = this.methodDefinition.responseDeserialize(message); - listener.onReceiveMessage(deserialized); + interceptingListener?.onReceiveMessage?.(deserialized); } catch (e) { readError = {code: Status.INTERNAL, details: 'Failed to parse server response', metadata: new Metadata()}; this.call.cancelWithStatus(readError.code, readError.details); @@ -326,9 +335,9 @@ class BaseInterceptingCall implements InterceptingCallInterface { }, onReceiveStatus: (status) => { if (readError) { - listener.onReceiveStatus(readError); + interceptingListener?.onReceiveStatus?.(readError); } else { - listener.onReceiveStatus(status); + interceptingListener?.onReceiveStatus?.(status); } } }); @@ -345,8 +354,22 @@ class BaseUnaryInterceptingCall extends BaseInterceptingCall implements Intercep constructor(call: Call, methodDefinition: ClientMethodDefinition) { super(call, methodDefinition); } - start(metadata: Metadata, listener: InterceptingListener): void { - super.start(metadata, listener); + start(metadata: Metadata, listener?: Partial): void { + let receivedMessage = false; + const wrapperListener: InterceptingListener = { + onReceiveMetadata: listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}), + onReceiveMessage: (message: any) => { + receivedMessage = true; + listener?.onReceiveMessage?.(message); + }, + onReceiveStatus: (status: StatusObject) => { + if (!receivedMessage) { + listener?.onReceiveMessage?.(null); + } + listener?.onReceiveStatus?.(status); + } + } + super.start(metadata, wrapperListener); this.call.startRead(); } } @@ -416,8 +439,8 @@ export function getInterceptingCall(interceptorArgs: InterceptorArguments, metho * initialValue, which is effectively at the end of the list, is a nextCall * function that invokes getBottomInterceptingCall, which handles * (de)serialization and also gets the underlying call from the channel */ - const getCall: NextCall = interceptors.reduceRight((previousValue: NextCall, currentValue: Interceptor) => { - return currentOptions => currentValue(currentOptions, previousValue); + const getCall: NextCall = interceptors.reduceRight((nextCall: NextCall, nextInterceptor: Interceptor) => { + return currentOptions => nextInterceptor(currentOptions, nextCall); }, (finalOptions: InterceptorOptions) => getBottomInterceptingCall(channel, methodDefinition.path, finalOptions, methodDefinition)); return getCall(interceptorOptions); } \ No newline at end of file diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index a7fbfa55..4d5de5af 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -29,14 +29,14 @@ import { SurfaceCall, } from './call'; import { CallCredentials } from './call-credentials'; -import { Call, Deadline, StatusObject, WriteObject, InterceptingListener } from './call-stream'; +import { Deadline, StatusObject, WriteObject, InterceptingListener } from './call-stream'; import { Channel, ConnectivityState, ChannelImplementation } from './channel'; import { ChannelCredentials } from './channel-credentials'; import { ChannelOptions } from './channel-options'; import { Status } from './constants'; import { Metadata } from './metadata'; import { ClientMethodDefinition } from './make-client'; -import { getInterceptingCall, Interceptor, InterceptorProvider, InterceptorArguments } from './client-interceptors'; +import { getInterceptingCall, Interceptor, InterceptorProvider, InterceptorArguments, InterceptingCallInterface } from './client-interceptors'; const CHANNEL_SYMBOL = Symbol(); const INTERCEPTOR_SYMBOL = Symbol(); @@ -231,13 +231,13 @@ export class Client { callInterceptors: options.interceptors || [], callInterceptorProviders: options.interceptor_providers || [] }; - const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); + const call: InterceptingCallInterface = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); if (options.credentials) { call.setCredentials(options.credentials); } - const writeObj: WriteObject = { message: argument }; const emitter = new ClientUnaryCallImpl(call); let responseMessage: ResponseType | null = null; + let receivedStatus = false; call.start(metadata, { onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); @@ -247,9 +247,12 @@ export class Client { call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); } responseMessage = message; - call.startRead(); }, onReceiveStatus(status: StatusObject) { + if (receivedStatus) { + return; + } + receivedStatus = true; if (status.code === Status.OK) { callback!(null, responseMessage!); } else { @@ -258,8 +261,8 @@ export class Client { emitter.emit('status', status); } }); - call.write(writeObj, () => {call.halfClose();}); - call.startRead(); + call.sendMessage(argument); + call.halfClose(); return emitter; } @@ -315,12 +318,13 @@ export class Client { callInterceptors: options.interceptors || [], callInterceptorProviders: options.interceptor_providers || [] }; - const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); + const call: InterceptingCallInterface = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); if (options.credentials) { call.setCredentials(options.credentials); } const emitter = new ClientWritableStreamImpl(call, serialize); let responseMessage: ResponseType | null = null; + let receivedStatus = false; call.start(metadata, { onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); @@ -330,9 +334,12 @@ export class Client { call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); } responseMessage = message; - call.startRead(); }, onReceiveStatus(status: StatusObject) { + if (receivedStatus) { + return; + } + receivedStatus = true; if (status.code === Status.OK) { callback!(null, responseMessage!); } else { @@ -341,7 +348,6 @@ export class Client { emitter.emit('status', status); } }); - call.startRead(); return emitter; } @@ -406,12 +412,12 @@ export class Client { callInterceptors: options.interceptors || [], callInterceptorProviders: options.interceptor_providers || [] }; - const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); + const call: InterceptingCallInterface = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); if (options.credentials) { call.setCredentials(options.credentials); } - const writeObj: WriteObject = { message: argument }; const stream = new ClientReadableStreamImpl(call, deserialize); + let receivedStatus = false; call.start(metadata, { onReceiveMetadata(metadata: Metadata) { stream.emit('metadata', metadata); @@ -422,6 +428,10 @@ export class Client { } }, onReceiveStatus(status: StatusObject) { + if (receivedStatus) { + return; + } + receivedStatus = true; stream.push(null); if (status.code !== Status.OK) { stream.emit('error', callErrorFromStatus(status)); @@ -429,7 +439,8 @@ export class Client { stream.emit('status', status); } }); - call.write(writeObj, () => {call.halfClose();}); + call.sendMessage(argument); + call.halfClose(); return stream; } @@ -467,7 +478,7 @@ export class Client { callInterceptors: options.interceptors || [], callInterceptorProviders: options.interceptor_providers || [] }; - const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); + const call: InterceptingCallInterface = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); if (options.credentials) { call.setCredentials(options.credentials); } @@ -476,6 +487,7 @@ export class Client { serialize, deserialize ); + let receivedStatus = false; call.start(metadata, { onReceiveMetadata(metadata: Metadata) { stream.emit('metadata', metadata); @@ -486,6 +498,10 @@ export class Client { } }, onReceiveStatus(status: StatusObject) { + if (receivedStatus) { + return; + } + receivedStatus = true; stream.push(null); if (status.code !== Status.OK) { stream.emit('error', callErrorFromStatus(status)); diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 02353bbe..e92b12ba 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -351,10 +351,8 @@ export class Http2ServerCallStream< }); this.stream.once('close', () => { - if (this.stream.rstCode === http2.constants.NGHTTP2_CANCEL) { - this.cancelled = true; - this.emit('cancelled', 'cancelled'); - } + this.cancelled = true; + this.emit('cancelled', 'cancelled'); }); this.stream.on('drain', () => { @@ -362,7 +360,20 @@ export class Http2ServerCallStream< }); } + private checkCancelled(): boolean { + /* In some cases the stream can become destroyed before the close event + * fires. That creates a race condition that this check works around */ + if (this.stream.destroyed) { + this.cancelled = true; + } + return this.cancelled; + } + sendMetadata(customMetadata?: Metadata) { + if (this.checkCancelled()) { + return; + } + if (this.metadataSent) { return; } @@ -397,6 +408,13 @@ export class Http2ServerCallStream< metadata.remove(GRPC_TIMEOUT_HEADER); } + // Remove several headers that should not be propagated to the application + metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING); + metadata.remove(http2.constants.HTTP2_HEADER_TE); + metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE); + metadata.remove('grpc-encoding'); + metadata.remove('grpc-accept-encoding'); + return metadata; } @@ -450,6 +468,9 @@ export class Http2ServerCallStream< metadata?: Metadata, flags?: number ) { + if (this.checkCancelled()) { + return; + } if (!metadata) { metadata = new Metadata(); } @@ -472,7 +493,7 @@ export class Http2ServerCallStream< } sendStatus(statusObj: StatusObject) { - if (this.cancelled) { + if (this.checkCancelled()) { return; } @@ -497,6 +518,9 @@ export class Http2ServerCallStream< } sendError(error: ServerErrorResponse | ServerStatusResponse) { + if (this.checkCancelled()) { + return; + } const status: StatusObject = { code: Status.UNKNOWN, details: 'message' in error ? error.message : 'Unknown Error', @@ -522,7 +546,7 @@ export class Http2ServerCallStream< } write(chunk: Buffer) { - if (this.cancelled) { + if (this.checkCancelled()) { return; } diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 780b8f86..90aeea20 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -346,7 +346,6 @@ export class Server { const call = new Http2ServerCallStream(stream, handler); const metadata: Metadata = call.receiveMetadata(headers) as Metadata; - switch (handler.type) { case 'unary': handleUnary(call, handler as UntypedUnaryHandler, metadata); diff --git a/test/api/client_interceptors_test.js b/test/api/client_interceptors_test.js index 9922ca97..19afd700 100644 --- a/test/api/client_interceptors_test.js +++ b/test/api/client_interceptors_test.js @@ -93,394 +93,508 @@ CallRegistry.prototype.maybeCallDone = function() { } }; -describe('Client interceptors', function() { - var echo_server; - var echo_port; - var client; +describe.only(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, function() { + describe('Client interceptors', function() { + var echo_server; + var echo_port; + var client; - function startServer() { - echo_server = new serverGrpc.Server(); - echo_server.addService(echo_service, { - echo: function(call, callback) { - call.sendMetadata(call.metadata); - if (call.request.value === 'error') { - var status = { - code: 2, - message: 'test status message' - }; - status.metadata = call.metadata; - callback(status, null); - return; - } - callback(null, call.request); - }, - echoClientStream: function(call, callback){ - call.sendMetadata(call.metadata); - var payload; - var err = null; - call.on('data', function(data) { - if (data.value === 'error') { - err = { - code: 2, - message: 'test status message' - }; - err.metadata = call.metadata; - return; - } - payload = data; - }); - call.on('end', function() { - callback(err, payload, call.metadata); - }); - }, - echoServerStream: function(call) { - call.sendMetadata(call.metadata); - if (call.request.value === 'error') { - var status = { - code: 2, - message: 'test status message' - }; - status.metadata = call.metadata; - call.emit('error', status); - return; - } - call.write(call.request); - call.end(call.metadata); - }, - echoBidiStream: function(call) { - call.sendMetadata(call.metadata); - call.on('data', function(data) { - if (data.value === 'error') { + function startServer(done) { + echo_server = new serverGrpc.Server(); + echo_server.addService(echo_service, { + echo: function(call, callback) { + call.sendMetadata(call.metadata); + if (call.request.value === 'error') { var status = { code: 2, message: 'test status message' }; + status.metadata = call.metadata; + callback(status, null); + return; + } + callback(null, call.request); + }, + echoClientStream: function(call, callback){ + call.sendMetadata(call.metadata); + var payload; + var err = null; + call.on('data', function(data) { + if (data.value === 'error') { + err = { + code: 2, + message: 'test status message' + }; + err.metadata = call.metadata; + return; + } + payload = data; + }); + call.on('end', function() { + callback(err, payload, call.metadata); + }); + }, + echoServerStream: function(call) { + call.sendMetadata(call.metadata); + if (call.request.value === 'error') { + var status = { + code: 2, + message: 'test status message' + }; + status.metadata = call.metadata; call.emit('error', status); return; } - call.write(data); - }); - call.on('end', function() { + call.write(call.request); call.end(call.metadata); - }); - } + }, + echoBidiStream: function(call) { + call.sendMetadata(call.metadata); + call.on('data', function(data) { + if (data.value === 'error') { + var status = { + code: 2, + message: 'test status message' + }; + call.emit('error', status); + return; + } + call.write(data); + }); + call.on('end', function() { + call.end(call.metadata); + }); + } + }); + var server_credentials = serverGrpc.ServerCredentials.createInsecure(); + echo_server.bindAsync('localhost:0', server_credentials, (error, port) => { + assert.ifError(error); + echo_port = port; + echo_server.start(); + done(); + }); + } + + function stopServer() { + echo_server.forceShutdown(); + } + + function resetClient() { + client = new EchoClient('localhost:' + echo_port, insecureCreds); + } + + before(function(done) { + startServer(done); }); - var server_credentials = serverGrpc.ServerCredentials.createInsecure(); - echo_port = echo_server.bind('localhost:0', server_credentials); - echo_server.start(); - } - - function stopServer() { - echo_server.forceShutdown(); - } - - function resetClient() { - client = new EchoClient('localhost:' + echo_port, insecureCreds); - } - - before(function() { - startServer(); - }); - beforeEach(function() { - resetClient(); - }); - after(function() { - stopServer(); - }); - - describe('execute downstream interceptors when a new call is made outbound', - function() { - var registry; - var options; - before(function() { - var stored_listener; - var stored_metadata; - var interceptor_a = function (options, nextCall) { - options.call_number = 1; - registry.addCall('construct a ' + options.call_number); - return new InterceptingCall(nextCall(options), { - start: function (metadata, listener, next) { - registry.addCall('start a ' + options.call_number); - stored_listener = listener; - stored_metadata = metadata; - next(metadata, listener); - }, - sendMessage: function (message, next) { - registry.addCall('send a ' + options.call_number); - var options2 = _.clone(options); - options2.call_number = 2; - var second_call = nextCall(options2); - second_call.start(stored_metadata); - second_call.sendMessage(message); - second_call.halfClose(); - next(message); - }, - halfClose: function (next) { - registry.addCall('close a ' + options.call_number); - next(); - } - }); - }; - - var interceptor_b = function (options, nextCall) { - registry.addCall('construct b ' + options.call_number); - return new InterceptingCall(nextCall(options), { - start: function (metadata, listener, next) { - registry.addCall('start b ' + options.call_number); - next(metadata, listener); - }, - sendMessage: function (message, next) { - registry.addCall('send b ' + options.call_number); - next(message); - }, - halfClose: function (next) { - registry.addCall('close b ' + options.call_number); - next(); - } - }); - }; - options = { - interceptors: [interceptor_a, interceptor_b] - }; - }); - var expected_calls = [ - 'construct a 1', - 'construct b 1', - 'start a 1', - 'start b 1', - 'send a 1', - 'construct b 2', - 'start b 2', - 'send b 2', - 'close b 2', - 'send b 1', - 'close a 1', - 'close b 1', - 'response' - ]; - - it('with unary call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var message = {}; - message.value = 'foo'; - client.echo(message, options, function(err, response){ - if (!err) { - registry.addCall('response'); - } - }); - }); - it('with client streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, false); - var message = {}; - message.value = 'foo'; - var stream = client.echoClientStream(options, function(err, response) { - if (!err) { - registry.addCall('response'); - } - }); - stream.write(message); - stream.end(); - }); - it('with server streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var message = {}; - message.value = 'foo'; - var stream = client.echoServerStream(message, options); - stream.on('data', function(data) { - registry.addCall('response'); - }); - }); - it('with bidi streaming call', function(done) { - registry = new CallRegistry( done, expected_calls, true); - var message = {}; - message.value = 'foo'; - var stream = client.echoBidiStream(options); - stream.on('data', function(data) { - registry.addCall('response'); - }); - stream.write(message); - stream.end(); - }); + beforeEach(function() { + resetClient(); + }); + after(function() { + stopServer(); }); - - describe('execute downstream interceptors when a new call is made inbound', - function() { - var registry; - var options; - before(function() { - var interceptor_a = function (options, nextCall) { - return new InterceptingCall(nextCall(options), { - start: function (metadata, listener, next) { - next(metadata, { - onReceiveMetadata: function () { }, - onReceiveMessage: function (message, next) { - registry.addCall('interceptor_a'); - var second_call = nextCall(options); - second_call.start(metadata, listener); - second_call.sendMessage(message); - second_call.halfClose(); - }, - onReceiveStatus: function () { } - }); - } - }); - }; - - var interceptor_b = function (options, nextCall) { - return new InterceptingCall(nextCall(options), { - start: function (metadata, listener, next) { - next(metadata, { - onReceiveMessage: function (message, next) { - registry.addCall('interceptor_b'); - next(message); - } - }); - } - }); - }; - - options = { - interceptors: [interceptor_a, interceptor_b] - }; - - }); - var expected_calls = ['interceptor_b', 'interceptor_a', - 'interceptor_b', 'response']; - it('with unary call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var message = {}; - message.value = 'foo'; - client.echo(message, options, function(err) { - if (!err) { - registry.addCall('response'); - } - }); - }); - it('with client streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var message = {}; - message.value = 'foo'; - var stream = client.echoClientStream(options, function(err, response) { - if (!err) { - registry.addCall('response'); - } - }); - stream.write(message); - stream.end(); - }); - }); - - it.skip('will delay operations and short circuit unary requests', function(done) { - var registry = new CallRegistry(done, ['foo_miss', 'foo_hit', 'bar_miss', - 'foo_hit_done', 'foo_miss_done', 'bar_miss_done']); - var cache = {}; - var _getCachedResponse = function(value) { - return cache[value]; - }; - var _store = function(key, value) { - cache[key] = value; - }; - - var interceptor = function(options, nextCall) { - var savedMetadata; - var startNext; - var storedListener; - var storedMessage; - var messageNext; - var requester = (new RequesterBuilder()) - .withStart(function(metadata, listener, next) { - savedMetadata = metadata; - storedListener = listener; - startNext = next; - }) - .withSendMessage(function(message, next) { - storedMessage = message; - messageNext = next; - }) - .withHalfClose(function(next) { - var cachedValue = _getCachedResponse(storedMessage.value); - if (cachedValue) { - var cachedMessage = {}; - cachedMessage.value = cachedValue; - registry.addCall(storedMessage.value + '_hit'); - storedListener.onReceiveMetadata(new Metadata()); - storedListener.onReceiveMessage(cachedMessage); - storedListener.onReceiveStatus( - (new StatusBuilder()).withCode(clientGrpc.status.OK).build()); - } else { - registry.addCall(storedMessage.value + '_miss'); - var newListener = (new ListenerBuilder()).withOnReceiveMessage( - function(message, next) { - _store(storedMessage.value, message.value); + describe('execute downstream interceptors when a new call is made outbound', + function() { + var registry; + var options; + before(function() { + var interceptor_a = function (options, nextCall) { + var stored_listener; + var stored_metadata; + options.call_number = 1; + registry.addCall('construct a ' + options.call_number); + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + registry.addCall('start a ' + options.call_number); + stored_listener = listener; + stored_metadata = metadata; + next(metadata, listener); + }, + sendMessage: function (message, next) { + registry.addCall('send a ' + options.call_number); + var options2 = _.clone(options); + options2.call_number = 2; + var second_call = nextCall(options2); + second_call.start(stored_metadata); + second_call.sendMessage(message); + second_call.halfClose(); next(message); - }).build(); - startNext(savedMetadata, newListener); - messageNext(storedMessage); + }, + halfClose: function (next) { + registry.addCall('close a ' + options.call_number); + next(); + } + }); + }; + + var interceptor_b = function (options, nextCall) { + registry.addCall('construct b ' + options.call_number); + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + registry.addCall('start b ' + options.call_number); + next(metadata, listener); + }, + sendMessage: function (message, next) { + registry.addCall('send b ' + options.call_number); + next(message); + }, + halfClose: function (next) { + registry.addCall('close b ' + options.call_number); + next(); + } + }); + }; + options = { + interceptors: [interceptor_a, interceptor_b] + }; + }); + var expected_calls = [ + 'construct a 1', + 'construct b 1', + 'start a 1', + 'start b 1', + 'send a 1', + 'construct b 2', + 'start b 2', + 'send b 2', + 'close b 2', + 'send b 1', + 'close a 1', + 'close b 1', + 'response' + ]; + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + client.echo(message, options, function(err, response){ + if (!err) { + registry.addCall('response'); + } + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, false); + var message = {}; + message.value = 'foo'; + var stream = client.echoClientStream(options, function(err, response) { + if (!err) { + registry.addCall('response'); + } + }); + stream.write(message); + stream.end(); + }); + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + var stream = client.echoServerStream(message, options); + stream.on('data', function(data) { + registry.addCall('response'); + }); + }); + it('with bidi streaming call', function(done) { + registry = new CallRegistry( done, expected_calls, true); + var message = {}; + message.value = 'foo'; + var stream = client.echoBidiStream(options); + stream.on('data', function(data) { + registry.addCall('response'); + }); + stream.write(message); + stream.end(); + }); + }); + + + describe('execute downstream interceptors when a new call is made inbound', + function() { + var registry; + var options; + before(function() { + var interceptor_a = function (options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + next(metadata, { + onReceiveMetadata: function (metadata, next) { }, + onReceiveMessage: function (message, next) { + registry.addCall('interceptor_a'); + var second_call = nextCall(options); + second_call.start(metadata, listener); + second_call.sendMessage(message); + second_call.halfClose(); + }, + onReceiveStatus: function (status, next) { } + }); + } + }); + }; + + var interceptor_b = function (options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + next(metadata, { + onReceiveMessage: function (message, next) { + registry.addCall('interceptor_b'); + next(message); + } + }); + } + }); + }; + + options = { + interceptors: [interceptor_a, interceptor_b] + }; + + }); + var expected_calls = ['interceptor_b', 'interceptor_a', + 'interceptor_b', 'response']; + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + client.echo(message, options, function(err) { + assert.ifError(err); + registry.addCall('response'); + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + var stream = client.echoClientStream(options, function(err, response) { + assert.ifError(err); + registry.addCall('response'); + }); + stream.write(message); + stream.end(); + }); + }); + + it('will delay operations and short circuit unary requests', function(done) { + var registry = new CallRegistry(done, ['foo_miss', 'foo_hit', 'bar_miss', + 'foo_hit_done', 'foo_miss_done', 'bar_miss_done']); + var cache = {}; + var _getCachedResponse = function(value) { + return cache[value]; + }; + var _store = function(key, value) { + cache[key] = value; + }; + + var interceptor = function(options, nextCall) { + var savedMetadata; + var startNext; + var storedListener; + var storedMessage; + var messageNext; + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + savedMetadata = metadata; + storedListener = listener; + startNext = next; + }) + .withSendMessage(function(message, next) { + storedMessage = message; + messageNext = next; + }) + .withHalfClose(function(next) { + var cachedValue = _getCachedResponse(storedMessage.value); + if (cachedValue) { + var cachedMessage = {}; + cachedMessage.value = cachedValue; + registry.addCall(storedMessage.value + '_hit'); + storedListener.onReceiveMetadata(new Metadata()); + storedListener.onReceiveMessage(cachedMessage); + storedListener.onReceiveStatus( + (new StatusBuilder()).withCode(clientGrpc.status.OK).build()); + } else { + registry.addCall(storedMessage.value + '_miss'); + var newListener = (new ListenerBuilder()).withOnReceiveMessage( + function(message, next) { + _store(storedMessage.value, message.value); + next(message); + }).build(); + startNext(savedMetadata, newListener); + messageNext(storedMessage); + next(); + } + }) + .withCancel(function(message, next) { next(); - } - }) - .withCancel(function(message, next) { - next(); - }).build(); + }).build(); - return new InterceptingCall(nextCall(options), requester); - }; + return new InterceptingCall(nextCall(options), requester); + }; - var options = { - interceptors: [interceptor] - }; + var options = { + interceptors: [interceptor] + }; - var foo_message = {}; - foo_message.value = 'foo'; - client.echo(foo_message, options, function(err, response){ - assert.equal(response.value, 'foo'); - registry.addCall('foo_miss_done'); + var foo_message = {}; + foo_message.value = 'foo'; client.echo(foo_message, options, function(err, response){ assert.equal(response.value, 'foo'); - registry.addCall('foo_hit_done'); + registry.addCall('foo_miss_done'); + client.echo(foo_message, options, function(err, response){ + assert.equal(response.value, 'foo'); + registry.addCall('foo_hit_done'); + }); + }); + + var bar_message = {}; + bar_message.value = 'bar'; + client.echo(bar_message, options, function(err, response) { + assert.equal(response.value, 'bar'); + registry.addCall('bar_miss_done'); }); }); - var bar_message = {}; - bar_message.value = 'bar'; - client.echo(bar_message, options, function(err, response) { - assert.equal(response.value, 'bar'); - registry.addCall('bar_miss_done'); - }); - }); + it('can retry failed messages and handle eventual success', function(done) { + var registry = new CallRegistry(done, + ['retry_foo_1', 'retry_foo_2', 'retry_foo_3', 'foo_result', + 'retry_bar_1', 'bar_result']); + var maxRetries = 3; + var retry_interceptor = function(options, nextCall) { + var savedMetadata; + var savedSendMessage; + var savedReceiveMessage; + var savedMessageNext; + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + savedMetadata = metadata; + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + savedReceiveMessage = message; + savedMessageNext = next; + }) + .withOnReceiveStatus(function(status, next) { + var retries = 0; + var retry = function(message, metadata) { + retries++; + var newCall = nextCall(options); + var receivedMessage; + newCall.start(metadata, { + onReceiveMessage: function(message) { + receivedMessage = message; + }, + onReceiveStatus: function(status) { + registry.addCall('retry_' + savedMetadata.get('name') + + '_' + retries); + if (status.code !== clientGrpc.status.OK) { + if (retries <= maxRetries) { + retry(message, metadata); + } else { + savedMessageNext(receivedMessage); + next(status); + } + } else { + registry.addCall('success_call'); + var new_status = (new StatusBuilder()) + .withCode(clientGrpc.status.OK).build(); + savedMessageNext(receivedMessage); + next(new_status); + } + } + }); + newCall.sendMessage(message); + newCall.halfClose(); + }; + if (status.code !== clientGrpc.status.OK) { + // Change the message we're sending only for test purposes + // so the server will respond without error + var newMessage = (savedMetadata.get('name')[0] === 'bar') ? + {value: 'bar'} : savedSendMessage; + retry(newMessage, savedMetadata); + } else { + savedMessageNext(savedReceiveMessage); + next(status); + } + } + ).build(); + next(metadata, new_listener); + }) + .withSendMessage(function(message, next) { + savedSendMessage = message; + next(message); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; - it('can retry failed messages and handle eventual success', function(done) { - var registry = new CallRegistry(done, - ['retry_foo_1', 'retry_foo_2', 'retry_foo_3', 'foo_result', - 'retry_bar_1', 'bar_result']); - var maxRetries = 3; - var retry_interceptor = function(options, nextCall) { - var savedMetadata; - var savedSendMessage; - var savedReceiveMessage; - var savedMessageNext; - var requester = (new RequesterBuilder()) - .withStart(function(metadata, listener, next) { - savedMetadata = metadata; - var new_listener = (new ListenerBuilder()) - .withOnReceiveMessage(function(message, next) { - savedReceiveMessage = message; - savedMessageNext = next; - }) - .withOnReceiveStatus(function(status, next) { + var options = { + interceptors: [retry_interceptor] + }; + + // Make a call which the server will return a non-OK status for + var foo_message = {value: 'error'}; + var foo_metadata = new Metadata(); + foo_metadata.set('name', 'foo'); + client.echo(foo_message, foo_metadata, options, function(err, response) { + assert.strictEqual(err.code, 2); + registry.addCall('foo_result'); + }); + + // Make a call which will fail the first time and succeed on the first + // retry + var bar_message = {value: 'error'}; + var bar_metadata = new Metadata(); + bar_metadata.set('name', 'bar'); + client.echo(bar_message, bar_metadata, options, function(err, response) { + assert.strictEqual(response.value, 'bar'); + registry.addCall('bar_result'); + }); + }); + + it('can retry and preserve interceptor order on success', function(done) { + var registry = new CallRegistry(done, + ['interceptor_c', 'retry_interceptor', 'fail_call', 'interceptor_c', + 'success_call', 'interceptor_a', 'result'], true); + var interceptor_a = function(options, nextCall) { + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + registry.addCall('interceptor_a'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var retry_interceptor = function(options, nextCall) { + var savedMetadata; + var savedMessage; + var savedMessageNext; + var sendMessageNext; + var originalMessage; + var startNext; + var originalListener; + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + startNext = next; + savedMetadata = metadata; + originalListener = listener; + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + savedMessage = message; + savedMessageNext = next; + }) + .withOnReceiveStatus(function(status, next) { var retries = 0; + var maxRetries = 1; + var receivedMessage; var retry = function(message, metadata) { retries++; - var newCall = nextCall(options); - var receivedMessage; - newCall.start(metadata, { + var new_call = nextCall(options); + new_call.start(metadata, { onReceiveMessage: function(message) { receivedMessage = message; }, onReceiveStatus: function(status) { - registry.addCall('retry_' + savedMetadata.get('name') + - '_' + retries); if (status.code !== clientGrpc.status.OK) { if (retries <= maxRetries) { retry(message, metadata); @@ -497,1281 +611,1300 @@ describe('Client interceptors', function() { } } }); - newCall.sendMessage(message); - newCall.halfClose(); + new_call.sendMessage(message); + new_call.halfClose(); }; + registry.addCall('retry_interceptor'); if (status.code !== clientGrpc.status.OK) { - // Change the message we're sending only for test purposes - // so the server will respond without error - var newMessage = (savedMetadata.get('name')[0] === 'bar') ? - {value: 'bar'} : savedSendMessage; + registry.addCall('fail_call'); + var newMessage = {value: 'foo'}; retry(newMessage, savedMetadata); - } else { - savedMessageNext(savedReceiveMessage); - next(status); - } - } - ).build(); - next(metadata, new_listener); - }) - .withSendMessage(function(message, next) { - savedSendMessage = message; - next(message); - }).build(); - return new InterceptingCall(nextCall(options), requester); - }; - - var options = { - interceptors: [retry_interceptor] - }; - - // Make a call which the server will return a non-OK status for - var foo_message = {value: 'error'}; - var foo_metadata = new Metadata(); - foo_metadata.set('name', 'foo'); - client.echo(foo_message, foo_metadata, options, function(err, response) { - assert.strictEqual(err.code, 2); - registry.addCall('foo_result'); - }); - - // Make a call which will fail the first time and succeed on the first - // retry - var bar_message = {value: 'error'}; - var bar_metadata = new Metadata(); - bar_metadata.set('name', 'bar'); - client.echo(bar_message, bar_metadata, options, function(err, response) { - assert.strictEqual(response.value, 'bar'); - registry.addCall('bar_result'); - }); - }); - - it('can retry and preserve interceptor order on success', function(done) { - var registry = new CallRegistry(done, - ['interceptor_c', 'retry_interceptor', 'fail_call', 'interceptor_c', - 'success_call', 'interceptor_a', 'result'], true); - var interceptor_a = function(options, nextCall) { - var requester = (new RequesterBuilder()) - .withStart(function(metadata, listener, next) { - var new_listener = (new ListenerBuilder()) - .withOnReceiveMessage(function(message, next) { - registry.addCall('interceptor_a'); - next(message); - }).build(); - next(metadata, new_listener); - }).build(); - return new InterceptingCall(nextCall(options), requester); - }; - - var retry_interceptor = function(options, nextCall) { - var savedMetadata; - var savedMessage; - var savedMessageNext; - var sendMessageNext; - var originalMessage; - var startNext; - var originalListener; - var requester = (new RequesterBuilder()) - .withStart(function(metadata, listener, next) { - startNext = next; - savedMetadata = metadata; - originalListener = listener; - var new_listener = (new ListenerBuilder()) - .withOnReceiveMessage(function(message, next) { - savedMessage = message; - savedMessageNext = next; - }) - .withOnReceiveStatus(function(status, next) { - var retries = 0; - var maxRetries = 1; - var receivedMessage; - var retry = function(message, metadata) { - retries++; - var new_call = nextCall(options); - new_call.start(metadata, { - onReceiveMessage: function(message) { - receivedMessage = message; - }, - onReceiveStatus: function(status) { - if (status.code !== clientGrpc.status.OK) { - if (retries <= maxRetries) { - retry(message, metadata); - } else { - savedMessageNext(receivedMessage); - next(status); - } - } else { - registry.addCall('success_call'); - var new_status = (new StatusBuilder()) - .withCode(clientGrpc.status.OK).build(); - savedMessageNext(receivedMessage); - next(new_status); - } - } - }); - new_call.sendMessage(message); - new_call.halfClose(); - }; - registry.addCall('retry_interceptor'); - if (status.code !== clientGrpc.status.OK) { - registry.addCall('fail_call'); - var newMessage = {value: 'foo'}; - retry(newMessage, savedMetadata); - } else { - savedMessageNext(savedMessage); - next(status); - } - }).build(); - next(metadata, new_listener); - }) - .withSendMessage(function(message, next) { - sendMessageNext = next; - originalMessage = message; - next(message); - }) - .build(); - return new InterceptingCall(nextCall(options), requester); - }; - - var interceptor_c = function(options, nextCall) { - var requester = (new RequesterBuilder()) - .withStart(function(metadata, listener, next) { - var new_listener = (new ListenerBuilder()) - .withOnReceiveMessage(function(message, next) { - registry.addCall('interceptor_c'); - next(message); - }).build(); - next(metadata, new_listener); - }).build(); - return new InterceptingCall(nextCall(options), requester); - }; - - var options = { - interceptors: [interceptor_a, retry_interceptor, interceptor_c] - }; - - var message = {value: 'error'}; - client.echo(message, options, function(err, response) { - assert.strictEqual(response.value, 'foo'); - registry.addCall('result'); - }); - }); - - describe('handle interceptor errors', function (doneOuter) { - var options; - before(function () { - var foo_interceptor = function (options, nextCall) { - var savedListener; - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - savedListener = listener; - next(metadata, listener); - }) - .withSendMessage(function (message, next) { - savedListener.onReceiveMetadata(new Metadata()); - savedListener.onReceiveMessage({ value: 'failed' }); - var error_status = (new StatusBuilder()) - .withCode(16) - .withDetails('Error in foo interceptor') - .build(); - savedListener.onReceiveStatus(error_status); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - options = { interceptors: [foo_interceptor] }; - }); - it('with unary call', function(done) { - var message = {}; - client.echo(message, options, function(err, response) { - assert.strictEqual(err.code, 16); - assert.strictEqual(err.message, - '16 UNAUTHENTICATED: Error in foo interceptor'); - done(); - doneOuter(); - }); - }); - }); - - describe('implement fallbacks for streaming RPCs', function() { - - var options; - before(function () { - var fallback_response = { value: 'fallback' }; - var savedMessage; - var savedMessageNext; - var interceptor = function (options, nextCall) { - var requester = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - var new_listener = (new ListenerBuilder()) - .withOnReceiveMessage(function (message, next) { - savedMessage = message; - savedMessageNext = next; - }) - .withOnReceiveStatus(function (status, next) { - if (status.code !== clientGrpc.status.OK) { - savedMessageNext(fallback_response); - next((new StatusBuilder()).withCode(clientGrpc.status.OK)); } else { savedMessageNext(savedMessage); next(status); } }).build(); next(metadata, new_listener); + }) + .withSendMessage(function(message, next) { + sendMessageNext = next; + originalMessage = message; + next(message); + }) + .build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var interceptor_c = function(options, nextCall) { + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + registry.addCall('interceptor_c'); + next(message); + }).build(); + next(metadata, new_listener); }).build(); return new InterceptingCall(nextCall(options), requester); }; - options = { - interceptors: [interceptor] + + var options = { + interceptors: [interceptor_a, retry_interceptor, interceptor_c] }; - }); - it('with client streaming call', function (done) { - var registry = new CallRegistry(done, ['foo_result', 'fallback_result']); - var stream = client.echoClientStream(options, function (err, response) { - assert.strictEqual(response.value, 'foo'); - registry.addCall('foo_result'); - }); - stream.write({ value: 'foo' }); - stream.end(); - stream = client.echoClientStream(options, function(err, response) { + var message = {value: 'error'}; + client.echo(message, options, function(err, response) { assert.ifError(err); - assert.strictEqual(response.value, 'fallback'); - registry.addCall('fallback_result'); + assert.strictEqual(response.value, 'foo'); + registry.addCall('result'); }); - stream.write({value: 'error'}); - stream.end(); }); - }); - describe('allows the call options to be modified for downstream interceptors', - function() { - var done; + describe('handle interceptor errors', function () { + var options; + before(function () { + var foo_interceptor = function (options, nextCall) { + var savedListener; + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + savedListener = listener; + next(metadata, listener); + }) + .withSendMessage(function (message, next) { + savedListener.onReceiveMetadata(new Metadata()); + savedListener.onReceiveMessage({ value: 'failed' }); + var error_status = (new StatusBuilder()) + .withCode(16) + .withDetails('Error in foo interceptor') + .build(); + savedListener.onReceiveStatus(error_status); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + it('with unary call', function(done) { + var message = {}; + client.echo(message, options, function(err, response) { + assert.strictEqual(err.code, 16); + assert.strictEqual(err.message, + '16 UNAUTHENTICATED: Error in foo interceptor'); + done(); + }); + }); + }); + + describe('implement fallbacks for streaming RPCs', function() { + + var options; + before(function () { + var fallback_response = { value: 'fallback' }; + var interceptor = function (options, nextCall) { + var savedMessage; + var savedMessageNext; + var requester = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function (message, next) { + savedMessage = message; + savedMessageNext = next; + }) + .withOnReceiveStatus(function (status, next) { + if (status.code !== clientGrpc.status.OK) { + savedMessageNext(fallback_response); + next((new StatusBuilder()).withCode(clientGrpc.status.OK).build()); + } else { + savedMessageNext(savedMessage); + next(status); + } + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + options = { + interceptors: [interceptor] + }; + }); + it('with client streaming call', function (done) { + var registry = new CallRegistry(done, ['foo_result', 'fallback_result']); + var stream = client.echoClientStream(options, function (err, response) { + assert.ifError(err); + assert.strictEqual(response.value, 'foo'); + registry.addCall('foo_result'); + }); + stream.write({ value: 'foo' }); + stream.end(); + + stream = client.echoClientStream(options, function(err, response) { + assert.ifError(err); + assert.strictEqual(response.value, 'fallback'); + registry.addCall('fallback_result'); + }); + stream.write({value: 'error'}); + stream.end(); + }); + }); + + describe('allows the call options to be modified for downstream interceptors', + function() { + var done; + var options; + var method_name; + var method_path_last; + before(function() { + var interceptor_a = function (options, nextCall) { + options.deadline = 10; + return new InterceptingCall(nextCall(options)); + }; + var interceptor_b = function (options, nextCall) { + assert.equal(options.method_definition.path, '/EchoService/' + + method_path_last); + assert.equal(options.deadline, 10); + done(); + return new InterceptingCall(nextCall(options)); + }; + + options = { + interceptors: [interceptor_a, interceptor_b], + deadline: 100 + }; + }); + + it('with unary call', function(cb) { + done = cb; + var metadata = new Metadata(); + var message = {}; + method_name = 'echo'; + method_path_last = 'Echo'; + + client.echo(message, metadata, options, function(){}); + }); + + it('with client streaming call', function(cb) { + done = cb; + var metadata = new Metadata(); + method_name = 'echoClientStream'; + method_path_last = 'EchoClientStream'; + + client.echoClientStream(metadata, options, function() {}); + }); + + it('with server streaming call', function(cb) { + done = cb; + var metadata = new Metadata(); + var message = {}; + method_name = 'echoServerStream'; + method_path_last = 'EchoServerStream'; + + const stream = client.echoServerStream(message, metadata, options); + stream.on('error', () => {}); + }); + + it('with bidi streaming call', function(cb) { + done = cb; + var metadata = new Metadata(); + method_name = 'echoBidiStream'; + method_path_last = 'EchoBidiStream'; + + const stream = client.echoBidiStream(metadata, options); + stream.on('error', () => {}); + }); + }); + + describe('pass accurate MethodDefinitions', function() { + var registry; + var initial_value = 'broken'; + var expected_value = 'working'; + var options; + before(function() { + var interceptor = function (options, nextCall) { + registry.addCall({ + path: options.method_definition.path, + requestStream: options.method_definition.requestStream, + responseStream: options.method_definition.responseStream + }); + var outbound = (new RequesterBuilder()) + .withSendMessage(function (message, next) { + message.value = expected_value; + next(message); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [interceptor] }; + }); + + it('with unary call', function(done) { + var unary_definition = { + path: '/EchoService/Echo', + requestStream: false, + responseStream: false + }; + registry = new CallRegistry(done, [ + unary_definition, + 'result_unary' + ]); + + var metadata = new Metadata(); + + var message = {value: initial_value}; + + client.echo(message, metadata, options, function(err, response){ + assert.equal(response.value, expected_value); + registry.addCall('result_unary'); + }); + + }); + it('with client streaming call', function(done) { + var client_stream_definition = { + path: '/EchoService/EchoClientStream', + requestStream: true, + responseStream: false + }; + registry = new CallRegistry(done, [ + client_stream_definition, + 'result_client_stream' + ]); + var metadata = new Metadata(); + var message = {value: initial_value}; + var client_stream = client.echoClientStream(metadata, options, + function(err, response) { + assert.strictEqual(response.value, expected_value); + registry.addCall('result_client_stream'); + }); + client_stream.write(message); + client_stream.end(); + + }); + it('with server streaming call', function(done) { + var server_stream_definition = { + path: '/EchoService/EchoServerStream', + responseStream: true, + requestStream: false, + }; + registry = new CallRegistry(done, [ + server_stream_definition, + 'result_server_stream' + ]); + + var metadata = new Metadata(); + var message = {value: initial_value}; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('data', function(data) { + assert.strictEqual(data.value, expected_value); + registry.addCall('result_server_stream'); + }); + + }); + it('with bidi streaming call', function(done) { + var bidi_stream_definition = { + path: '/EchoService/EchoBidiStream', + requestStream: true, + responseStream: true + }; + registry = new CallRegistry(done, [ + bidi_stream_definition, + 'result_bidi_stream' + ]); + + var metadata = new Metadata(); + var message = {value: initial_value}; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(data) { + assert.strictEqual(data.value, expected_value); + registry.addCall('result_bidi_stream'); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + it('uses interceptors passed to the client constructor', function(done) { + var registry = new CallRegistry(done, { + 'constructor_interceptor_a_echo': 1, + 'constructor_interceptor_b_echoServerStream': 1, + 'invocation_interceptor': 1, + 'result_unary': 1, + 'result_stream': 1, + 'result_invocation': 1, + 'status_stream': 1 + }); + + var constructor_interceptor_a = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + registry.addCall('constructor_interceptor_a_echo'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var constructor_interceptor_b = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + registry.addCall('constructor_interceptor_b_echoServerStream'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var invocation_interceptor = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + registry.addCall('invocation_interceptor'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + + var interceptor_providers = [ + function(method_definition) { + if (!method_definition.requestStream && + !method_definition.responseStream) { + return constructor_interceptor_a; + } + }, + function(method_definition) { + if (!method_definition.requestStream && + method_definition.responseStream) { + return constructor_interceptor_b; + } + } + ]; + var constructor_options = { + interceptor_providers: interceptor_providers + }; + var int_client = new EchoClient('localhost:' + echo_port, insecureCreds, + constructor_options); + var message = {}; + int_client.echo(message, function(error, response) { + assert.ifError(error); + registry.addCall('result_unary'); + }); + var stream = int_client.echoServerStream(message); + stream.on('data', function() { + registry.addCall('result_stream'); + }); + stream.on('status', (status) => { + registry.addCall('status_stream'); + }); + stream.on('error', (error) => { + assert.ifError(error); + }); + + var options = { interceptors: [invocation_interceptor] }; + int_client.echo(message, options, function() { + registry.addCall('result_invocation'); + }); + }); + + it.skip('will reject conflicting interceptor options at invocation', + function(done) { + try { + client.echo('message', { + interceptors: [], + interceptor_providers: [] + }, function () {}); + } catch (e) { + assert.equal(e.name, 'InterceptorConfigurationError'); + done(); + } + }); + + it('will resolve interceptor providers at invocation', function(done) { + var constructor_interceptor = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function() { + assert(false); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var invocation_interceptor = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function() { + done(); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var constructor_interceptor_providers = [ + function() { + return constructor_interceptor; + } + ]; + var invocation_interceptor_providers = [ + function() { + return invocation_interceptor; + } + ]; + var constructor_options = { + interceptor_providers: constructor_interceptor_providers + }; + var int_client = new EchoClient('localhost:' + echo_port, insecureCreds, + constructor_options); + var message = {}; + var options = { interceptor_providers: invocation_interceptor_providers }; + int_client.echo(message, options, function() {}); + }); + + describe('trigger a stack of interceptors in nested order', function() { + var registry; + var expected_calls = ['constructA', 'constructB', 'outboundA', 'outboundB', + 'inboundB', 'inboundA', 'callDone']; var options; - var method_name; - var method_path_last; before(function() { var interceptor_a = function (options, nextCall) { - options.deadline = 10; - return new InterceptingCall(nextCall(options)); + registry.addCall('constructA'); + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('outboundA'); + var new_listener = (new ListenerBuilder()).withOnReceiveMessage( + function (message, next) { + registry.addCall('inboundA'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new clientGrpc.InterceptingCall(nextCall(options), + outbound); }; var interceptor_b = function (options, nextCall) { - assert.equal(options.method_definition.path, '/EchoService/' + - method_path_last); - assert.equal(options.deadline, 10); - done(); - return new InterceptingCall(nextCall(options)); + registry.addCall('constructB'); + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('outboundB'); + var new_listener = (new ListenerBuilder()).withOnReceiveMessage( + function (message, next) { + registry.addCall('inboundB'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); }; + options = { interceptors: [interceptor_a, interceptor_b] }; + }); + var metadata = new Metadata(); + var message = {}; - options = { - interceptors: [interceptor_a, interceptor_b], - deadline: 100 + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + client.echo(message, metadata, options, function(error, response){ + assert.ifError(error); + registry.addCall('callDone'); + }); + }); + + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var client_stream = client.echoClientStream(metadata, options, function(error, response) { + assert.ifError(error); + registry.addCall('callDone'); + }); + client_stream.write(message); + client_stream.end(); + }); + + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var stream = client.echoServerStream(message, metadata, options); + stream.on('data', function() {}); + stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('callDone'); + }); + stream.on('error', (error) => { + assert.ifError(error); + }); + }); + + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(){}); + bidi_stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('callDone'); + }); + bidi_stream.on('error', (error) => { + assert.ifError(error); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger interceptors horizontally', function() { + var expected_calls = [ + 'interceptor_a_start', + 'interceptor_b_start', + 'interceptor_a_send', + 'interceptor_b_send', + 'call_end' + ]; + var registry; + var options; + var metadata = new Metadata(); + var message = {}; + + before(function() { + var interceptor_a = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('interceptor_a_start'); + next(metadata, listener); + }) + .withSendMessage(function (message, next) { + registry.addCall('interceptor_a_send'); + next(message); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); }; + var interceptor_b = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('interceptor_b_start'); + next(metadata, listener); + }) + .withSendMessage(function (message, next) { + registry.addCall('interceptor_b_send'); + next(message); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [interceptor_a, interceptor_b] }; + }); + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + client.echo(message, metadata, options, function(error, response){ + assert.ifError(error); + registry.addCall('call_end'); + }); + }); + + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var client_stream = client.echoClientStream(metadata, options, function(error, response) { + assert.ifError(error); + registry.addCall('call_end'); + }); + client_stream.write(message); + client_stream.end(); + }); + + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var stream = client.echoServerStream(message, metadata, options); + stream.on('data', function() {}); + stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('call_end'); + }); + stream.on('error', (error) => { + assert.ifError(error); + }); + }); + + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(){}); + bidi_stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('call_end'); + }); + bidi_stream.on('error', (error) => { + assert.ifError(error); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when sending metadata', function() { + var registry; + + var message = {}; + var key_names = ['original', 'foo', 'bar']; + var keys = { + original: 'originalkey', + foo: 'fookey', + bar: 'barkey' + }; + var values = { + original: 'originalvalue', + foo: 'foovalue', + bar: 'barvalue' + }; + var expected_calls = ['foo', 'bar', 'response', 'end']; + var options; + before(function () { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + metadata.add(keys.foo, values.foo); + registry.addCall('foo'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var bar_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + metadata.add(keys.bar, values.bar); + registry.addCall('bar'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [foo_interceptor, bar_interceptor] }; + }); + + it('with unary call', function (done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + + var unary_call = client.echo(message, metadata, options, function (error, response) { + assert.ifError(error); + registry.addCall('end'); + }); + unary_call.on('metadata', function (metadata) { + var has_expected_values = _.every(key_names, function (key_name) { + return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); + }); + assert(has_expected_values); + registry.addCall('response'); + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + + var client_stream = client.echoClientStream(metadata, options, + function (error, response) { + assert.ifError(error); + registry.addCall('end'); + }); + client_stream.write(message); + client_stream.on('metadata', function (metadata) { + var has_expected_values = _.every(key_names, function (key_name) { + return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); + }); + assert(has_expected_values); + registry.addCall('response'); + }); + client_stream.end(); + }); + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('metadata', function (metadata) { + var has_expected_values = _.every(key_names, function (key_name) { + return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); + }); + assert(has_expected_values); + registry.addCall('response'); + }); + server_stream.on('data', function() { }); + server_stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('end'); + }); + server_stream.on('error', (error) => { + assert.ifError(error); + }); + }); + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('metadata', function(metadata) { + var has_expected_values = _.every(key_names, function(key_name) { + return _.isEqual(metadata.get(keys[key_name]),[values[key_name]]); + }); + assert(has_expected_values); + bidi_stream.end(); + registry.addCall('response'); + }); + bidi_stream.on('data', function() { }); + bidi_stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('end'); + }); + bidi_stream.on('error', (error) => { + assert.ifError(error); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when sending messages', function() { + var registry; + var originalValue = 'foo'; + var expectedValue = 'bar'; + var options; + var metadata = new Metadata(); + var expected_calls = ['messageIntercepted', 'response', 'end']; + + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withSendMessage(function (message, next) { + assert.strictEqual(message.value, originalValue); + registry.addCall('messageIntercepted'); + next({ value: expectedValue }); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + + client.echo(message, metadata, options, function (err, response) { + assert.ifError(err); + assert.strictEqual(response.value, expectedValue); + registry.addCall('response'); + registry.addCall('end'); + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true, true); + var message = {value: originalValue}; + var client_stream = client.echoClientStream(metadata, options, + function (err, response) { + assert.ifError(err); + assert.strictEqual(response.value, expectedValue); + registry.addCall('response'); + registry.addCall('end'); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('data', function (data) { + assert.strictEqual(data.value, expectedValue); + registry.addCall('response'); + }); + server_stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('end'); + }); + server_stream.on('error', (error) => { + assert.ifError(error); + }); + }); + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(data) { + assert.strictEqual(data.value, expectedValue); + registry.addCall('response'); + }); + bidi_stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('end'); + }); + bidi_stream.on('error', (error) => { + assert.ifError(error); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when client closes the call', function() { + var registry; + var expected_calls = [ + 'response', 'halfClose', 'end' + ]; + var message = {}; + var options; + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withHalfClose(function (next) { + registry.addCall('halfClose'); + next(); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + it('with unary call', function (done) { + registry = new CallRegistry(done, expected_calls); + client.echo(message, options, function (err, response) { + assert.ifError(err); + registry.addCall('response'); + registry.addCall('end'); + }); + }); + it('with client streaming call', function (done) { + registry = new CallRegistry(done, expected_calls); + var client_stream = client.echoClientStream(options, + function (err, response) { + assert.ifError(err); + registry.addCall('response'); + registry.addCall('end'); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with server streaming call', function (done) { + registry = new CallRegistry(done, expected_calls); + var server_stream = client.echoServerStream(message, options); + server_stream.on('data', function (data) { + registry.addCall('response'); + }); + server_stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('end'); + }); + server_stream.on('error', (error) => { + assert.ifError(error); + }); + }); + it('with bidi streaming call', function (done) { + registry = new CallRegistry(done, expected_calls); + var bidi_stream = client.echoBidiStream(options); + bidi_stream.on('data', function (data) { + registry.addCall('response'); + }); + bidi_stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + registry.addCall('end'); + }); + bidi_stream.on('error', (error) => { + assert.ifError(error); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when the stream is canceled', function() { + var done; + var message = {}; + var options; + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withCancel(function (next) { + done(); + next(); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; }); it('with unary call', function(cb) { done = cb; - var metadata = new Metadata(); - var message = {}; - method_name = 'echo'; - method_path_last = 'Echo'; - - client.echo(message, metadata, options, function(){}); + var stream = client.echo(message, options, function() {}); + stream.cancel(); }); it('with client streaming call', function(cb) { done = cb; - var metadata = new Metadata(); - method_name = 'echoClientStream'; - method_path_last = 'EchoClientStream'; - - client.echoClientStream(metadata, options, function() {}); + var stream = client.echoClientStream(options, function() {}); + stream.cancel(); }); - it('with server streaming call', function(cb) { done = cb; - var metadata = new Metadata(); - var message = {}; - method_name = 'echoServerStream'; - method_path_last = 'EchoServerStream'; - - client.echoServerStream(message, metadata, options); + var stream = client.echoServerStream(message, options); + stream.on('error', (error) => { + assert.strictEqual(error.code, clientGrpc.status.CANCELLED); + }); + stream.cancel(); }); - it('with bidi streaming call', function(cb) { done = cb; - var metadata = new Metadata(); - method_name = 'echoBidiStream'; - method_path_last = 'EchoBidiStream'; - - client.echoBidiStream(metadata, options); - }); - }); - - describe('pass accurate MethodDefinitions', function() { - var registry; - var initial_value = 'broken'; - var expected_value = 'working'; - var options; - before(function() { - var interceptor = function (options, nextCall) { - registry.addCall({ - path: options.method_definition.path, - requestStream: options.method_definition.requestStream, - responseStream: options.method_definition.responseStream + var stream = client.echoBidiStream(options); + stream.on('error', (error) => { + assert.strictEqual(error.code, clientGrpc.status.CANCELLED); }); - var outbound = (new RequesterBuilder()) - .withSendMessage(function (message, next) { - message.value = expected_value; - next(message); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - options = { interceptors: [interceptor] }; - }); - - it('with unary call', function(done) { - var unary_definition = { - path: '/EchoService/Echo', - requestStream: false, - responseStream: false - }; - registry = new CallRegistry(done, [ - unary_definition, - 'result_unary' - ]); - - var metadata = new Metadata(); - - var message = {value: initial_value}; - - client.echo(message, metadata, options, function(err, response){ - assert.equal(response.value, expected_value); - registry.addCall('result_unary'); - }); - - }); - it('with client streaming call', function(done) { - - var client_stream_definition = { - path: '/EchoService/EchoClientStream', - requestStream: true, - responseStream: false - }; - registry = new CallRegistry(done, [ - client_stream_definition, - 'result_client_stream' - ], false, true); - var metadata = new Metadata(); - var message = {value: initial_value}; - var client_stream = client.echoClientStream(metadata, options, - function(err, response) { - assert.strictEqual(response.value, expected_value); - registry.addCall('result_client_stream'); - }); - client_stream.write(message); - client_stream.end(); - - }); - it('with server streaming call', function(done) { - var server_stream_definition = { - path: '/EchoService/EchoServerStream', - responseStream: true, - requestStream: false, - }; - registry = new CallRegistry(done, [ - server_stream_definition, - 'result_server_stream' - ]); - - var metadata = new Metadata(); - var message = {value: initial_value}; - var server_stream = client.echoServerStream(message, metadata, options); - server_stream.on('data', function(data) { - assert.strictEqual(data.value, expected_value); - registry.addCall('result_server_stream'); - }); - - }); - it('with bidi streaming call', function(done) { - var bidi_stream_definition = { - path: '/EchoService/EchoBidiStream', - requestStream: true, - responseStream: true - }; - registry = new CallRegistry(done, [ - bidi_stream_definition, - 'result_bidi_stream' - ]); - - var metadata = new Metadata(); - var message = {value: initial_value}; - var bidi_stream = client.echoBidiStream(metadata, options); - bidi_stream.on('data', function(data) { - assert.strictEqual(data.value, expected_value); - registry.addCall('result_bidi_stream'); - }); - bidi_stream.write(message); - bidi_stream.end(); - }); - }); - - it('uses interceptors passed to the client constructor', function(done) { - var registry = new CallRegistry(done, { - 'constructor_interceptor_a_echo': 1, - 'constructor_interceptor_b_echoServerStream': 1, - 'invocation_interceptor': 1, - 'result_unary': 1, - 'result_stream': 1, - 'result_invocation': 1 - }); - - var constructor_interceptor_a = function(options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function(metadata, listener, next) { - registry.addCall('constructor_interceptor_a_echo'); - next(metadata, listener); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - var constructor_interceptor_b = function(options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function(metadata, listener, next) { - registry.addCall('constructor_interceptor_b_echoServerStream'); - next(metadata, listener); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - var invocation_interceptor = function(options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function(metadata, listener, next) { - registry.addCall('invocation_interceptor'); - next(metadata, listener); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - - var interceptor_providers = [ - function(method_definition) { - if (!method_definition.requestStream && - !method_definition.responseStream) { - return constructor_interceptor_a; - } - }, - function(method_definition) { - if (!method_definition.requestStream && - method_definition.responseStream) { - return constructor_interceptor_b; - } - } - ]; - var constructor_options = { - interceptor_providers: interceptor_providers - }; - var int_client = new EchoClient('localhost:' + echo_port, insecureCreds, - constructor_options); - var message = {}; - int_client.echo(message, function() { - registry.addCall('result_unary'); - }); - var stream = int_client.echoServerStream(message); - stream.on('data', function() { - registry.addCall('result_stream'); - }); - - var options = { interceptors: [invocation_interceptor] }; - int_client.echo(message, options, function() { - registry.addCall('result_invocation'); - }); - }); - - it.skip('will reject conflicting interceptor options at invocation', - function(done) { - try { - client.echo('message', { - interceptors: [], - interceptor_providers: [] - }, function () {}); - } catch (e) { - assert.equal(e.name, 'InterceptorConfigurationError'); - done(); - } - }); - - it('will resolve interceptor providers at invocation', function(done) { - var constructor_interceptor = function(options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function() { - assert(false); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - var invocation_interceptor = function(options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function() { - done(); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - var constructor_interceptor_providers = [ - function() { - return constructor_interceptor; - } - ]; - var invocation_interceptor_providers = [ - function() { - return invocation_interceptor; - } - ]; - var constructor_options = { - interceptor_providers: constructor_interceptor_providers - }; - var int_client = new EchoClient('localhost:' + echo_port, insecureCreds, - constructor_options); - var message = {}; - var options = { interceptor_providers: invocation_interceptor_providers }; - int_client.echo(message, options, function() {}); - }); - - describe('trigger a stack of interceptors in nested order', function() { - var registry; - var expected_calls = ['constructA', 'constructB', 'outboundA', 'outboundB', - 'inboundB', 'inboundA']; - var options; - before(function() { - var interceptor_a = function (options, nextCall) { - registry.addCall('constructA'); - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - registry.addCall('outboundA'); - var new_listener = (new ListenerBuilder()).withOnReceiveMessage( - function (message, next) { - registry.addCall('inboundA'); - next(message); - }).build(); - next(metadata, new_listener); - }).build(); - return new clientGrpc.InterceptingCall(nextCall(options), - outbound); - }; - var interceptor_b = function (options, nextCall) { - registry.addCall('constructB'); - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - registry.addCall('outboundB'); - var new_listener = (new ListenerBuilder()).withOnReceiveMessage( - function (message, next) { - registry.addCall('inboundB'); - next(message); - }).build(); - next(metadata, new_listener); - }).build(); - return new InterceptingCall(nextCall(options), - outbound); - }; - options = { interceptors: [interceptor_a, interceptor_b] }; - }); - var metadata = new Metadata(); - var message = {}; - - it('with unary call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - client.echo(message, metadata, options, function(){}); - }); - - it('with client streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var client_stream = client.echoClientStream(metadata, options, - function() {}); - client_stream.write(message); - client_stream.end(); - }); - - it('with server streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var stream = client.echoServerStream(message, metadata, options); - stream.on('data', function() {}); - }); - - it('with bidi streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var bidi_stream = client.echoBidiStream(metadata, options); - bidi_stream.on('data', function(){}); - bidi_stream.write(message); - bidi_stream.end(); - }); - }); - - describe('trigger interceptors horizontally', function() { - var expected_calls = [ - 'interceptor_a_start', - 'interceptor_b_start', - 'interceptor_a_send', - 'interceptor_b_send' - ]; - var registry; - var options; - var metadata = new Metadata(); - var message = {}; - - before(function() { - var interceptor_a = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - registry.addCall('interceptor_a_start'); - next(metadata, listener); - }) - .withSendMessage(function (message, next) { - registry.addCall('interceptor_a_send'); - next(message); - }).build(); - return new InterceptingCall(nextCall(options), - outbound); - }; - var interceptor_b = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - registry.addCall('interceptor_b_start'); - next(metadata, listener); - }) - .withSendMessage(function (message, next) { - registry.addCall('interceptor_b_send'); - next(message); - }).build(); - return new InterceptingCall(nextCall(options), - outbound); - }; - options = { interceptors: [interceptor_a, interceptor_b] }; - }); - - it('with unary call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - client.echo(message, metadata, options, function(){}); - }); - - it('with client streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var client_stream = client.echoClientStream(metadata, options, - function() {}); - client_stream.write(message); - client_stream.end(); - }); - - it('with server streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var stream = client.echoServerStream(message, metadata, options); - stream.on('data', function() {}); - }); - - it('with bidi streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var bidi_stream = client.echoBidiStream(metadata, options); - bidi_stream.on('data', function(){}); - bidi_stream.write(message); - bidi_stream.end(); - }); - }); - - describe('trigger when sending metadata', function() { - var registry; - - var message = {}; - var key_names = ['original', 'foo', 'bar']; - var keys = { - original: 'originalkey', - foo: 'fookey', - bar: 'barkey' - }; - var values = { - original: 'originalvalue', - foo: 'foovalue', - bar: 'barvalue' - }; - var expected_calls = ['foo', 'bar', 'response']; - var options; - before(function () { - var foo_interceptor = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - metadata.add(keys.foo, values.foo); - registry.addCall('foo'); - next(metadata, listener); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - var bar_interceptor = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - metadata.add(keys.bar, values.bar); - registry.addCall('bar'); - next(metadata, listener); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - options = { interceptors: [foo_interceptor, bar_interceptor] }; - }); - - it('with unary call', function (done) { - registry = new CallRegistry(done, expected_calls, true); - var metadata = new Metadata(); - metadata.add(keys.original, values.original); - - var unary_call = client.echo(message, metadata, options, function () {}); - unary_call.on('metadata', function (metadata) { - var has_expected_values = _.every(key_names, function (key_name) { - return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); - }); - assert(has_expected_values); - registry.addCall('response'); - }); - }); - it('with client streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var metadata = new Metadata(); - metadata.add(keys.original, values.original); - - var client_stream = client.echoClientStream(metadata, options, - function () { - }); - client_stream.write(message); - client_stream.on('metadata', function (metadata) { - var has_expected_values = _.every(key_names, function (key_name) { - return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); - }); - assert(has_expected_values); - registry.addCall('response'); - }); - client_stream.end(); - }); - it('with server streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var metadata = new Metadata(); - metadata.add(keys.original, values.original); - var server_stream = client.echoServerStream(message, metadata, options); - server_stream.on('metadata', function (metadata) { - var has_expected_values = _.every(key_names, function (key_name) { - return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); - }); - assert(has_expected_values); - registry.addCall('response'); - }); - server_stream.on('data', function() { }); - }); - it('with bidi streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var metadata = new Metadata(); - metadata.add(keys.original, values.original); - var bidi_stream = client.echoBidiStream(metadata, options); - bidi_stream.on('metadata', function(metadata) { - var has_expected_values = _.every(key_names, function(key_name) { - return _.isEqual(metadata.get(keys[key_name]),[values[key_name]]); - }); - assert(has_expected_values); - bidi_stream.end(); - registry.addCall('response'); - }); - bidi_stream.on('data', function() { }); - bidi_stream.write(message); - bidi_stream.end(); - }); - }); - - describe('trigger when sending messages', function() { - var registry; - var originalValue = 'foo'; - var expectedValue = 'bar'; - var options; - var metadata = new Metadata(); - var expected_calls = ['messageIntercepted', 'response']; - - before(function() { - var foo_interceptor = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withSendMessage(function (message, next) { - assert.strictEqual(message.value, originalValue); - registry.addCall('messageIntercepted'); - next({ value: expectedValue }); - }).build(); - return new InterceptingCall(nextCall(options), - outbound); - }; - options = { interceptors: [foo_interceptor] }; - }); - - it('with unary call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var message = {value: originalValue}; - - client.echo(message, metadata, options, function (err, response) { - assert.strictEqual(response.value, expectedValue); - registry.addCall('response'); - }); - }); - it('with client streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var message = {value: originalValue}; - var client_stream = client.echoClientStream(metadata, options, - function (err, response) { - assert.strictEqual(response.value, expectedValue); - registry.addCall('response'); - }); - client_stream.write(message); - client_stream.end(); - }); - it('with server streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var message = {value: originalValue}; - var server_stream = client.echoServerStream(message, metadata, options); - server_stream.on('data', function (data) { - assert.strictEqual(data.value, expectedValue); - registry.addCall('response'); - }); - }); - it('with bidi streaming call', function(done) { - registry = new CallRegistry(done, expected_calls, true); - var message = {value: originalValue}; - var bidi_stream = client.echoBidiStream(metadata, options); - bidi_stream.on('data', function(data) { - assert.strictEqual(data.value, expectedValue); - registry.addCall('response'); - }); - bidi_stream.write(message); - bidi_stream.end(); - }); - }); - - describe('trigger when client closes the call', function() { - var registry; - var expected_calls = [ - 'response', 'halfClose' - ]; - var message = {}; - var options; - before(function() { - var foo_interceptor = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withHalfClose(function (next) { - registry.addCall('halfClose'); - next(); - }).build(); - return new InterceptingCall(nextCall(options), - outbound); - }; - options = { interceptors: [foo_interceptor] }; - }); - it('with unary call', function (done) { - registry = new CallRegistry(done, expected_calls); - client.echo(message, options, function (err, response) { - if (!err) { - registry.addCall('response'); - } - }); - }); - it('with client streaming call', function (done) { - registry = new CallRegistry(done, expected_calls); - var client_stream = client.echoClientStream(options, - function (err, response) { }); - client_stream.write(message, function (err) { - if (!err) { - registry.addCall('response'); - } - }); - client_stream.end(); - }); - it('with server streaming call', function (done) { - registry = new CallRegistry(done, expected_calls); - var server_stream = client.echoServerStream(message, options); - server_stream.on('data', function (data) { - registry.addCall('response'); - }); - }); - it('with bidi streaming call', function (done) { - registry = new CallRegistry(done, expected_calls); - var bidi_stream = client.echoBidiStream(options); - bidi_stream.on('data', function (data) { - registry.addCall('response'); - }); - bidi_stream.write(message); - bidi_stream.end(); - }); - }); - - describe('trigger when the stream is canceled', function() { - var done; - var message = {}; - var options; - before(function() { - var foo_interceptor = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withCancel(function (next) { - done(); - next(); - }).build(); - return new InterceptingCall(nextCall(options), - outbound); - }; - options = { interceptors: [foo_interceptor] }; - }); - - it('with unary call', function(cb) { - done = cb; - var stream = client.echo(message, options, function() {}); - stream.cancel(); - }); - - it('with client streaming call', function(cb) { - done = cb; - var stream = client.echoClientStream(options, function() {}); - stream.cancel(); - }); - it('with server streaming call', function(cb) { - done = cb; - var stream = client.echoServerStream(message, options); - stream.cancel(); - }); - it('with bidi streaming call', function(cb) { - done = cb; - var stream = client.echoBidiStream(options); - stream.cancel(); - }); - }); - - describe('trigger when receiving metadata', function() { - var message = {}; - var expectedKey = 'foo'; - var expectedValue = 'bar'; - var options; - before(function() { - var foo_interceptor = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - var new_listener = (new ListenerBuilder()).withOnReceiveMetadata( - function (metadata, next) { - metadata.add(expectedKey, expectedValue); - next(metadata); - }).build(); - next(metadata, new_listener); - }).build(); - return new InterceptingCall(nextCall(options), outbound); - }; - options = { interceptors: [foo_interceptor] }; - }); - - it('with unary call', function(done) { - var metadata = new Metadata(); - var unary_call = client.echo(message, metadata, options, function () {}); - unary_call.on('metadata', function (metadata) { - assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); - done(); - }); - }); - it('with client streaming call', function(done) { - var metadata = new Metadata(); - var client_stream = client.echoClientStream(metadata, options, - function () {}); - client_stream.write(message); - client_stream.on('metadata', function (metadata) { - assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); - done(); - }); - client_stream.end(); - }); - it('with server streaming call', function(done) { - var metadata = new Metadata(); - var server_stream = client.echoServerStream(message, metadata, options); - server_stream.on('metadata', function (metadata) { - assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); - done(); - }); - server_stream.on('data', function() { }); - }); - it('with bidi streaming call', function(done) { - var metadata = new Metadata(); - var bidi_stream = client.echoBidiStream(metadata, options); - bidi_stream.on('metadata', function(metadata) { - assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); - bidi_stream.end(); - done(); - }); - bidi_stream.on('data', function() { }); - bidi_stream.write(message); - }); - }); - - describe('trigger when sending messages', function() { - var originalValue = 'foo'; - var expectedValue = 'bar'; - var options; - var metadata = new Metadata(); - before(function() { - var foo_interceptor = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - var new_listener = (new ListenerBuilder()).withOnReceiveMessage( - function (message, next) { - if (!message) { - next(message); - return; - } - assert.strictEqual(message.value, originalValue); - message.value = expectedValue; - next(message); - }).build(); - next(metadata, new_listener); - }).build(); - return new InterceptingCall(nextCall(options), - outbound); - }; - options = { interceptors: [foo_interceptor] }; - }); - - it('with unary call', function (done) { - var message = { value: originalValue }; - client.echo(message, metadata, options, function (err, response) { - assert.strictEqual(response.value, expectedValue); - done(); - }); - }); - it('with client streaming call', function (done) { - var message = { value: originalValue }; - var client_stream = client.echoClientStream(metadata, options, - function (err, response) { - assert.strictEqual(response.value, expectedValue); - done(); - }); - client_stream.write(message); - client_stream.end(); - }); - it('with server streaming call', function (done) { - var message = { value: originalValue }; - var server_stream = client.echoServerStream(message, metadata, options); - server_stream.on('data', function (data) { - assert.strictEqual(data.value, expectedValue); - done(); - }); - }); - it('with bidi streaming call', function (done) { - var message = { value: originalValue }; - var bidi_stream = client.echoBidiStream(metadata, options); - bidi_stream.on('data', function (data) { - assert.strictEqual(data.value, expectedValue); - done(); - }); - bidi_stream.write(message); - bidi_stream.end(); - }); - }); - - describe('trigger when receiving status', function() { - var expectedStatus = 'foo'; - var options; - var metadata = new Metadata(); - before(function() { - var foo_interceptor = function (options, nextCall) { - var outbound = (new RequesterBuilder()) - .withStart(function (metadata, listener, next) { - var new_listener = (new ListenerBuilder()).withOnReceiveStatus( - function (status, next) { - assert.strictEqual(status.code, 2); - assert.strictEqual(status.details, 'test status message'); - var new_status = { - code: 1, - details: expectedStatus, - metadata: {} - }; - next(new_status); - }).build(); - next(metadata, new_listener); - }).build(); - return new InterceptingCall(nextCall(options), - outbound); - }; - options = { interceptors: [foo_interceptor] }; - }); - it('with unary call', function (done) { - var message = { value: 'error' }; - var unary_call = client.echo(message, metadata, options, function () { - }); - unary_call.on('status', function (status) { - assert.strictEqual(status.code, 1); - assert.strictEqual(status.details, expectedStatus); - done(); - }); - }); - it('with client streaming call', function (done) { - var message = { value: 'error' }; - var client_stream = client.echoClientStream(metadata, options, - function () { - }); - client_stream.on('status', function (status) { - assert.strictEqual(status.code, 1); - assert.strictEqual(status.details, expectedStatus); - done(); - }); - client_stream.write(message); - client_stream.end(); - }); - - it('with server streaming call', function(done) { - var message = {value: 'error'}; - var server_stream = client.echoServerStream(message, metadata, options); - server_stream.on('error', function (err) { - }); - server_stream.on('data', function (data) { - }); - server_stream.on('status', function (status) { - assert.strictEqual(status.code, 1); - assert.strictEqual(status.details, expectedStatus); - done(); + stream.cancel(); }); }); - it('with bidi streaming call', function(done) { - var message = {value: 'error'}; - var bidi_stream = client.echoBidiStream(metadata, options); - bidi_stream.on('error', function(err) {}); - bidi_stream.on('data', function(data) {}); - bidi_stream.on('status', function(status) { - assert.strictEqual(status.code, 1); - assert.strictEqual(status.details, expectedStatus); - done(); - }); - bidi_stream.write(message); - bidi_stream.end(); - }); - }); - describe('delay streaming headers', function() { - var options; - var metadata = new Metadata(); - before(function() { - var foo_interceptor = function (options, nextCall) { - var startNext; - var startListener; - var startMetadata; - var methods = { - start: function (metadata, listener, next) { - startNext = next; - startListener = listener; - startMetadata = metadata; - }, - sendMessage: function (message, next) { - startMetadata.set('fromMessage', message.value); - startNext(startMetadata, startListener); - next(message); - } + describe('trigger when receiving metadata', function() { + var message = {}; + var expectedKey = 'foo'; + var expectedValue = 'bar'; + var options; + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()).withOnReceiveMetadata( + function (metadata, next) { + metadata.add(expectedKey, expectedValue); + next(metadata); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); }; - return new InterceptingCall(nextCall(options), methods); - }; - options = { interceptors: [foo_interceptor] }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function(done) { + var metadata = new Metadata(); + var unary_call = client.echo(message, metadata, options, function () {}); + unary_call.on('metadata', function (metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + done(); + }); + }); + it('with client streaming call', function(done) { + var metadata = new Metadata(); + var client_stream = client.echoClientStream(metadata, options, + function () {}); + client_stream.write(message); + client_stream.on('metadata', function (metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + done(); + }); + client_stream.end(); + }); + it('with server streaming call', function(done) { + var metadata = new Metadata(); + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('metadata', function (metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + done(); + }); + server_stream.on('error', (error) => { + assert.ifError(error); + }); + server_stream.on('data', function() { }); + }); + it('with bidi streaming call', function(done) { + var metadata = new Metadata(); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('metadata', function(metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + bidi_stream.end(); + done(); + }); + bidi_stream.on('data', function() { }); + bidi_stream.on('error', (error) => { + assert.ifError(error); + }) + bidi_stream.write(message); + }); }); - it('with client streaming call', function (done) { - var message = { value: 'foo' }; - var client_stream = client.echoClientStream(metadata, options, - function () { }); - client_stream.on('metadata', function (metadata) { - assert.equal(metadata.get('fromMessage'), 'foo'); - done(); + describe('trigger when sending messages', function() { + var originalValue = 'foo'; + var expectedValue = 'bar'; + var options; + var metadata = new Metadata(); + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()).withOnReceiveMessage( + function (message, next) { + if (!message) { + next(message); + return; + } + assert.strictEqual(message.value, originalValue); + message.value = expectedValue; + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; }); - client_stream.write(message); - client_stream.end(); - }); - it('with bidi streaming call', function (done) { - var message = { value: 'foo' }; - var bidi_stream = client.echoBidiStream(metadata, options); - bidi_stream.on('metadata', function (metadata) { - assert.equal(metadata.get('fromMessage'), 'foo'); - done(); - }); - bidi_stream.write(message); - bidi_stream.end(); - }); - }); - describe.only('order of operations enforced for async interceptors', function() { - it('with unary call', function(done) { - var expected_calls = [ - 'close_b', - 'message_b', - 'start_b', - 'done' - ]; - var registry = new CallRegistry(done, expected_calls, true, true); - var message = {value: 'foo'}; - var interceptor_a = function(options, nextCall) { - return new InterceptingCall(nextCall(options), { - start: function(metadata, listener, next) { - setTimeout(function() { next(metadata, listener); }, 50); - }, - sendMessage: function(message, next) { - setTimeout(function () { next(message); }, 10); - } + it('with unary call', function (done) { + var message = { value: originalValue }; + client.echo(message, metadata, options, function (err, response) { + assert.ifError(err); + assert.strictEqual(response.value, expectedValue); + done(); }); - }; - var interceptor_b = function(options, nextCall) { - return new InterceptingCall(nextCall(options), { - start: function(metadata, listener, next) { - registry.addCall('start_b'); - next(metadata, listener); - }, - sendMessage: function(message, next) { - registry.addCall('message_b'); - next(message); - }, - halfClose: function(next) { - registry.addCall('close_b'); - next(); - } + }); + it('with client streaming call', function (done) { + var message = { value: originalValue }; + var client_stream = client.echoClientStream(metadata, options, + function (err, response) { + assert.ifError(err); + assert.strictEqual(response.value, expectedValue); + done(); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with server streaming call', function (done) { + var message = { value: originalValue }; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('data', function (data) { + assert.strictEqual(data.value, expectedValue); + done(); }); - }; - var options = { - interceptors: [interceptor_a, interceptor_b] - }; - client.echo(message, options, function(err, response) { - assert.strictEqual(err, null); - registry.addCall('done'); + server_stream.on('error', (error) => { + assert.ifError(error); + }); + }); + it('with bidi streaming call', function (done) { + var message = { value: originalValue }; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function (data) { + assert.strictEqual(data.value, expectedValue); + done(); + }); + bidi_stream.on('error', (error) => { + assert.ifError(error); + }); + bidi_stream.write(message); + bidi_stream.end(); }); }); - it('with serverStreaming call', function(done) { - var expected_calls = [ - 'close_b', - 'message_b', - 'start_b', - 'done' - ]; - var registry = new CallRegistry(done, expected_calls, true, true); - var message = {value: 'foo'}; - var interceptor_a = function(options, nextCall) { - return new InterceptingCall(nextCall(options), { - start: function(metadata, listener, next) { - setTimeout(function() { next(metadata, listener); }, 50); - }, - sendMessage: function(message, next) { - setTimeout(function () { next(message); }, 10); - } + + describe('trigger when receiving status', function() { + var expectedStatus = 'foo'; + var options; + var metadata = new Metadata(); + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()).withOnReceiveStatus( + function (status, next) { + assert.strictEqual(status.code, 2); + assert.strictEqual(status.details, 'test status message'); + var new_status = { + code: 1, + details: expectedStatus, + metadata: {} + }; + next(new_status); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + it('with unary call', function (done) { + var message = { value: 'error' }; + var unary_call = client.echo(message, metadata, options, function () { }); - }; - var interceptor_b = function(options, nextCall) { - return new InterceptingCall(nextCall(options), { - start: function(metadata, listener, next) { - registry.addCall('start_b'); - next(metadata, listener); - }, - sendMessage: function(message, next) { - registry.addCall('message_b'); - next(message); - }, - halfClose: function(next) { - registry.addCall('close_b'); - next(); - } + unary_call.on('status', function (status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + }); + it('with client streaming call', function (done) { + var message = { value: 'error' }; + var client_stream = client.echoClientStream(metadata, options, + function () { + }); + client_stream.on('status', function (status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + client_stream.write(message); + client_stream.end(); + }); + + it('with server streaming call', function(done) { + var message = {value: 'error'}; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('error', function (err) { + }); + server_stream.on('data', function (data) { + }); + server_stream.on('status', function (status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + }); + + it('with bidi streaming call', function(done) { + var message = {value: 'error'}; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('error', function(err) {}); + bidi_stream.on('data', function(data) {}); + bidi_stream.on('status', function(status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + describe('delay streaming headers', function() { + var options; + var metadata = new Metadata(); + before(function() { + var foo_interceptor = function (options, nextCall) { + var startNext; + var startListener; + var startMetadata; + var methods = { + start: function (metadata, listener, next) { + startNext = next; + startListener = listener; + startMetadata = metadata; + }, + sendMessage: function (message, next) { + startMetadata.set('fromMessage', message.value); + startNext(startMetadata, startListener); + next(message); + } + }; + return new InterceptingCall(nextCall(options), methods); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with client streaming call', function (done) { + var message = { value: 'foo' }; + var client_stream = client.echoClientStream(metadata, options, + function (error, response) { + assert.ifError(error); + done(); + }); + client_stream.on('metadata', function (metadata) { + assert.equal(metadata.get('fromMessage'), 'foo'); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with bidi streaming call', function (done) { + var message = { value: 'foo' }; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('metadata', function (metadata) { + assert.equal(metadata.get('fromMessage'), 'foo'); + }); + bidi_stream.on('data', () => {}); + bidi_stream.on('status', (status) => { + assert.strictEqual(status.code, clientGrpc.status.OK); + done(); + }); + bidi_stream.on('error', (error) => { + assert.ifError(error); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe.skip('order of operations enforced for async interceptors', function() { + it('with unary call', function(done) { + var expected_calls = [ + 'close_b', + 'message_b', + 'start_b', + 'done' + ]; + var registry = new CallRegistry(done, expected_calls, true); + var message = {value: 'foo'}; + var interceptor_a = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + setTimeout(function() { next(metadata, listener); }, 50); + }, + sendMessage: function(message, next) { + setTimeout(function () { next(message); }, 10); + } + }); + }; + var interceptor_b = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + registry.addCall('start_b'); + next(metadata, listener); + }, + sendMessage: function(message, next) { + registry.addCall('message_b'); + next(message); + }, + halfClose: function(next) { + registry.addCall('close_b'); + next(); + } + }); + }; + var options = { + interceptors: [interceptor_a, interceptor_b] + }; + client.echo(message, options, function(err, response) { + assert.strictEqual(err, null); + registry.addCall('done'); + }); + }); + it('with serverStreaming call', function(done) { + var expected_calls = [ + 'close_b', + 'message_b', + 'start_b', + 'done' + ]; + var registry = new CallRegistry(done, expected_calls, true); + var message = {value: 'foo'}; + var interceptor_a = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + setTimeout(function() { next(metadata, listener); }, 50); + }, + sendMessage: function(message, next) { + setTimeout(function () { next(message); }, 10); + } + }); + }; + var interceptor_b = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + registry.addCall('start_b'); + next(metadata, listener); + }, + sendMessage: function(message, next) { + registry.addCall('message_b'); + next(message); + }, + halfClose: function(next) { + registry.addCall('close_b'); + next(); + } + }); + }; + var options = { + interceptors: [interceptor_a, interceptor_b] + }; + var stream = client.echoServerStream(message, options); + stream.on('data', function(response) { + assert.strictEqual(response.value, 'foo'); + registry.addCall('done'); }); - }; - var options = { - interceptors: [interceptor_a, interceptor_b] - }; - var stream = client.echoServerStream(message, options); - stream.on('data', function(response) { - assert.strictEqual(response.value, 'foo'); - registry.addCall('done'); }); }); });