From acbf17d9b7f4158a7feea015205727143abf46d5 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 25 Oct 2019 10:26:58 -0700 Subject: [PATCH] Add client interceptors --- packages/grpc-js/src/call-stream.ts | 57 +- packages/grpc-js/src/call.ts | 14 +- packages/grpc-js/src/client-interceptors.ts | 423 +++++ packages/grpc-js/src/client.ts | 149 +- packages/grpc-js/src/index.ts | 32 +- packages/grpc-js/src/make-client.ts | 16 +- packages/grpc-native-core/deps/grpc | 2 +- test/api/client_interceptors_test.js | 1778 +++++++++++++++++++ 8 files changed, 2365 insertions(+), 106 deletions(-) create mode 100644 packages/grpc-js/src/client-interceptors.ts create mode 100644 test/api/client_interceptors_test.js diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 50ad7e34..23823f96 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -86,20 +86,40 @@ export interface InterceptingListener { onReceiveStatus(status: StatusObject): void; } -class InterceptingListenerImpl implements InterceptingListener { +export function isInterceptingListener(listener: Listener | InterceptingListener): listener is InterceptingListener { + return listener.onReceiveMetadata !== undefined && listener.onReceiveMetadata.length === 1; +} + +export class InterceptingListenerImpl implements InterceptingListener { + private processingMessage = false; + private pendingStatus: StatusObject | null = null; constructor(private listener: FullListener, private nextListener: InterceptingListener) {} onReceiveMetadata(metadata: Metadata): void { - const next = this.nextListener.onReceiveMetadata.bind(this.nextListener); - this.listener.onReceiveMetadata(metadata, next); + this.listener.onReceiveMetadata(metadata, (metadata) => { + this.nextListener.onReceiveMetadata(metadata); + }); } onReceiveMessage(message: any): void { - const next = this.nextListener.onReceiveMessage.bind(this.nextListener); - this.listener.onReceiveMessage(message, next); + /* If this listener processes messages asynchronously, the last message may + * be reordered with respect to the status */ + this.processingMessage = true; + this.listener.onReceiveMessage(message, (msg) => { + this.processingMessage = false; + this.nextListener.onReceiveMessage(msg); + if (this.pendingStatus) { + this.nextListener.onReceiveStatus(this.pendingStatus); + } + }); } onReceiveStatus(status: StatusObject): void { - const next = this.nextListener.onReceiveStatus.bind(this.nextListener); - this.listener.onReceiveStatus(status, next); + this.listener.onReceiveStatus(status, (processedStatus) => { + if (this.processingMessage) { + this.pendingStatus = processedStatus; + } else { + this.nextListener.onReceiveStatus(processedStatus); + } + }); } } @@ -107,11 +127,16 @@ export interface WriteCallback { (error?: Error | null): void; } -export type Call = { +export interface MessageContext { + callback?: WriteCallback; + flags?: number; +} + +export interface Call { cancelWithStatus(status: Status, details: string): void; getPeer(): string; start(metadata: Metadata, listener: InterceptingListener): void; - write(writeObj: WriteObject, callback: WriteCallback): void; + sendMessageWithContext(context: MessageContext, message: any): void; startRead(): void; halfClose(): void; @@ -214,6 +239,13 @@ export class Http2CallStream implements Call { } } else { this.listener!.onReceiveMessage(message); + /* Don't wait for the upper layer to ask for a read before pushing null + * to close out the call, because pushing null doesn't actually push + * another message up to the upper layer */ + if (this.unpushedReadMessages.length > 0 && this.unpushedReadMessages[0] === null) { + this.unpushedReadMessages.shift(); + this.push(null); + } } } @@ -495,7 +527,12 @@ export class Http2CallStream implements Call { } } - write(writeObj: WriteObject, cb: WriteCallback) { + sendMessageWithContext(context: MessageContext, message: Buffer) { + const writeObj: WriteObject = { + message: message, + flags: context.flags + }; + const cb: WriteCallback = context.callback || (() => {}); this.filterStack.sendMessage(Promise.resolve(writeObj)).then(message => { if (this.http2Stream === null) { this.pendingWrite = message.message; diff --git a/packages/grpc-js/src/call.ts b/packages/grpc-js/src/call.ts index 8e74e609..2f9cfdd4 100644 --- a/packages/grpc-js/src/call.ts +++ b/packages/grpc-js/src/call.ts @@ -160,7 +160,12 @@ export class ClientWritableStreamImpl extends Writable } _write(chunk: RequestType, encoding: string, cb: WriteCallback) { - tryWrite(this.call, this.serialize, chunk, encoding, cb); + const writeObj: WriteObject = { message: chunk }; + const flags: number = Number(encoding); + if (!Number.isNaN(flags)) { + writeObj.flags = flags; + } + this.call.write(writeObj, cb); } _final(cb: Function) { @@ -192,7 +197,12 @@ export class ClientDuplexStreamImpl extends Duplex } _write(chunk: RequestType, encoding: string, cb: WriteCallback) { - tryWrite(this.call, this.serialize, chunk, encoding, cb); + const writeObj: WriteObject = { message: chunk }; + const flags: number = Number(encoding); + if (!Number.isNaN(flags)) { + writeObj.flags = flags; + } + this.call.write(writeObj, cb); } _final(cb: Function) { diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts new file mode 100644 index 00000000..9703db49 --- /dev/null +++ b/packages/grpc-js/src/client-interceptors.ts @@ -0,0 +1,423 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Metadata } from './metadata'; +import { StatusObject, CallStreamOptions, Listener, MetadataListener, MessageListener, StatusListener, FullListener, InterceptingListener, WriteObject, WriteCallback, InterceptingListenerImpl, isInterceptingListener, MessageContext, Http2CallStream, Deadline, Call } from './call-stream'; +import { Status } from './constants'; +import { Channel } from './channel'; +import { CallOptions } from './client'; +import { CallCredentials } from './call-credentials'; +import { ClientMethodDefinition, Serialize } from './make-client'; + +export class InterceptorConfigurationError extends Error { + constructor(message: string) { + super(message); + this.name = 'InterceptorConfigurationError'; + Error.captureStackTrace(this, InterceptorConfigurationError); + } +} + +export interface MetadataRequester { + (metadata: Metadata, listener: InterceptingListener, next: (metadata: Metadata, listener: InterceptingListener | Listener) => void): void; +} + +export interface MessageRequester { + (message: any, next: (message: any) => void): void; +} + +export interface CloseRequester { + (next: () => void): void; +} + +export interface CancelRequester { + (next: () => void): void; +} + +export interface FullRequester { + start: MetadataRequester; + sendMessage: MessageRequester; + halfClose: CloseRequester; + cancel: CancelRequester; +} + +export type Requester = Partial; + +export class ListenerBuilder { + private metadata: MetadataListener | undefined = undefined; + private message: MessageListener | undefined = undefined; + private status: StatusListener | undefined = undefined; + + withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this { + this.metadata = onReceiveMetadata; + return this; + } + + withOnReceiveMessage(onReceiveMessage: MessageListener): this { + this.message = onReceiveMessage; + return this; + } + + withOnReceiveStatus(onReceiveStatus: StatusListener): this { + this.status = onReceiveStatus; + return this; + } + + build(): Listener { + return { + onReceiveMetadata: this.metadata, + onReceiveMessage: this.message, + onReceiveStatus: this.status + } + } +} + +export class RequesterBuilder { + private start: MetadataRequester | undefined = undefined; + private message: MessageRequester | undefined = undefined; + private halfClose: CloseRequester | undefined = undefined; + private cancel: CancelRequester | undefined = undefined; + + withStart(start: MetadataRequester): this { + this.start = start; + return this; + } + + withSendMessage(sendMessage: MessageRequester): this { + this.message = sendMessage; + return this; + } + + withHalfClose(halfClose: CloseRequester): this { + this.halfClose = halfClose; + return this; + } + + withCancel(cancel: CancelRequester): this { + this.cancel = cancel; + return this; + } + + build(): Requester { + return { + start: this.start, + sendMessage: this.message, + halfClose: this.halfClose, + cancel: this.cancel + }; + } +} + +const defaultListener: FullListener = { + onReceiveMetadata: (metadata, next) => { + next(metadata); + }, + onReceiveMessage: (message, next) => { + next(message); + }, + onReceiveStatus: (status, next) => { + next(status); + } +}; + +const defaultRequester: FullRequester = { + start: (metadata, listener, next) => { + next(metadata, listener); + }, + sendMessage: (message, next) => { + next(message); + }, + halfClose: (next) => { + next(); + }, + cancel: (next) => { + next(); + } +} + +export interface InterceptorOptions extends CallOptions { + method_definition: ClientMethodDefinition; +} + +export interface InterceptingCallInterface { + cancelWithStatus(status: Status, details: string): void; + getPeer(): string; + start(metadata: Metadata, listener: InterceptingListener): void; + sendMessageWithContext(context: MessageContext, message: any): void; + startRead(): void; + halfClose(): void; + + getDeadline(): Deadline; + getCredentials(): CallCredentials; + setCredentials(credentials: CallCredentials): void; + getMethod(): string; + getHost(): string; +} + +export class InterceptingCall implements InterceptingCallInterface { + private requester: FullRequester; + private processingMessage = false; + private pendingHalfClose = false; + constructor(private nextCall: InterceptingCallInterface, requester?: Requester) { + if (requester) { + // Undefined elements overwrite, unset ones do not + this.requester = { + start: requester.start || defaultRequester.start, + sendMessage: requester.sendMessage || defaultRequester.sendMessage, + halfClose: requester.halfClose || defaultRequester.halfClose, + cancel: requester.cancel || defaultRequester.cancel + } + } else { + this.requester = defaultRequester; + } + } + + cancelWithStatus(status: Status, details: string) { + this.requester.cancel(() => { + this.nextCall.cancelWithStatus(status, details); + }); + } + + getPeer() { + return this.nextCall.getPeer(); + } + start(metadata: Metadata, interceptingListener: InterceptingListener): void { + this.requester.start(metadata, interceptingListener, (md, listener) => { + let finalInterceptingListener: InterceptingListener; + if (isInterceptingListener(listener)) { + finalInterceptingListener = listener; + } else { + const fullListener: FullListener = { + onReceiveMetadata: listener.onReceiveMetadata || defaultListener.onReceiveMetadata, + onReceiveMessage: listener.onReceiveMessage || defaultListener.onReceiveMessage, + onReceiveStatus: listener.onReceiveStatus || defaultListener.onReceiveStatus + }; + finalInterceptingListener = new InterceptingListenerImpl(fullListener, interceptingListener); + } + this.nextCall.start(md, finalInterceptingListener); + }); + } + sendMessageWithContext(context: MessageContext, message: any): void { + this.processingMessage = true; + this.requester.sendMessage(message, (finalMessage) => { + this.processingMessage = false; + this.nextCall.sendMessageWithContext(context, finalMessage); + if (this.pendingHalfClose) { + this.nextCall.halfClose(); + } + }) + } + sendMessage(message: any): void { + this.sendMessageWithContext({}, message); + } + startRead(): void { + this.nextCall.startRead(); + } + halfClose(): void { + this.requester.halfClose(() => { + if (this.processingMessage) { + this.pendingHalfClose = true; + } else { + this.nextCall.halfClose(); + } + }); + } + getDeadline(): number | Date { + return this.nextCall.getDeadline(); + } + getCredentials(): CallCredentials { + return this.nextCall.getCredentials(); + } + setCredentials(credentials: CallCredentials): void { + this.nextCall.setCredentials(credentials); + } + getMethod(): string { + return this.nextCall.getHost(); + } + getHost(): string { + return this.nextCall.getHost(); + } +} + +function getCall(channel: Channel, path: string, options: CallOptions): Call { + var deadline; + var host; + var parent; + var propagate_flags; + var credentials; + if (options) { + deadline = options.deadline; + host = options.host; + + propagate_flags = options.propagate_flags; + credentials = options.credentials; + } + if (deadline === undefined) { + deadline = Infinity; + } + var call = channel.createCall(path, deadline, host, + parent, propagate_flags); + if (credentials) { + call.setCredentials(credentials); + } + return call; +} + +class BaseInterceptingCall implements InterceptingCallInterface { + constructor(protected call: Call, protected methodDefinition: ClientMethodDefinition) {} + cancelWithStatus(status: Status, details: string): void { + this.call.cancelWithStatus(status, details); + } + getPeer(): string { + return this.call.getPeer(); + } + getDeadline(): number | Date { + return this.call.getDeadline(); + } + getCredentials(): CallCredentials { + return this.call.getCredentials(); + } + setCredentials(credentials: CallCredentials): void { + this.call.setCredentials(credentials); + } + getMethod(): string { + return this.call.getMethod(); + } + getHost(): string { + return this.call.getHost(); + } + sendMessageWithContext(context: MessageContext, message: any): void { + let serialized: Buffer; + try { + serialized = this.methodDefinition.requestSerialize(message); + this.call.sendMessageWithContext(context, serialized); + } catch (e) { + this.call.cancelWithStatus(Status.INTERNAL, 'Serialization failure'); + } + } + start(metadata: Metadata, listener: InterceptingListener): void { + let readError: StatusObject | null = null; + this.call.start(metadata, { + onReceiveMetadata: (metadata) => { + listener.onReceiveMetadata(metadata); + }, + onReceiveMessage: (message) => { + let deserialized: any; + try { + deserialized = this.methodDefinition.responseDeserialize(message); + listener.onReceiveMessage(deserialized); + } catch (e) { + readError = {code: Status.INTERNAL, details: 'Failed to parse server response', metadata: new Metadata()}; + this.call.cancelWithStatus(readError.code, readError.details); + } + }, + onReceiveStatus: (status) => { + if (readError) { + listener.onReceiveStatus(readError); + } else { + listener.onReceiveStatus(status); + } + } + }); + } + startRead() { + this.call.startRead(); + } + halfClose(): void { + this.call.halfClose(); + } +} + +class BaseUnaryInterceptingCall extends BaseInterceptingCall implements InterceptingCallInterface { + constructor(call: Call, methodDefinition: ClientMethodDefinition) { + super(call, methodDefinition); + } + start(metadata: Metadata, listener: InterceptingListener): void { + super.start(metadata, listener); + this.call.startRead(); + } +} + +class BaseStreamingInterceptingCall extends BaseInterceptingCall implements InterceptingCallInterface { } + +function getBottomInterceptingCall(channel: Channel, path: string, options: InterceptorOptions, methodDefinition: ClientMethodDefinition) { + const call = getCall(channel, path, options); + if (methodDefinition.responseStream) { + return new BaseStreamingInterceptingCall(call, methodDefinition); + } else { + return new BaseUnaryInterceptingCall(call, methodDefinition); + } +} + +export interface NextCall { + (options: InterceptorOptions): InterceptingCallInterface; +} + +export interface Interceptor { + (options: InterceptorOptions, nextCall: NextCall): InterceptingCall +} + +export interface InterceptorProvider { + (methodDefinition: ClientMethodDefinition): Interceptor; +} + +export interface InterceptorArguments { + clientInterceptors: Interceptor[], + clientInterceptorProviders: InterceptorProvider[], + callInterceptors: Interceptor[], + callInterceptorProviders: InterceptorProvider[] +} + +export function getInterceptingCall(interceptorArgs: InterceptorArguments, methodDefinition: ClientMethodDefinition, options: CallOptions, channel: Channel): InterceptingCallInterface { + if (interceptorArgs.clientInterceptors.length > 0 && interceptorArgs.clientInterceptorProviders.length > 0) { + throw new InterceptorConfigurationError( + 'Both interceptors and interceptor_providers were passed as options ' + + 'to the client constructor. Only one of these is allowed.' + ); + } + if (interceptorArgs.callInterceptors.length > 0 && interceptorArgs.callInterceptorProviders.length > 0) { + throw new InterceptorConfigurationError( + 'Both interceptors and interceptor_providers were passed as call ' + + 'options. Only one of these is allowed.' + ); + } + let interceptors: Interceptor[] = []; + // Interceptors passed to the call override interceptors passed to the client constructor + if (interceptorArgs.callInterceptors.length > 0 || interceptorArgs.callInterceptorProviders.length > 0) { + interceptors = ([] as Interceptor[]).concat( + interceptorArgs.callInterceptors, + interceptorArgs.callInterceptorProviders.map(provider => provider(methodDefinition)) + ).filter(interceptor => interceptor); + // Filter out falsy values when providers return nothing + } else { + interceptors = ([] as Interceptor[]).concat( + interceptorArgs.clientInterceptors, + interceptorArgs.clientInterceptorProviders.map(provider => provider(methodDefinition)) + ).filter(interceptor => interceptor); + // Filter out falsy values when providers return nothing + } + const interceptorOptions = Object.assign({}, options, {method_definition: methodDefinition}); + /* For each interceptor in the list, the nextCall function passed to it is + * based on the next interceptor in the list, using a nextCall function + * constructed with the following interceptor in the list, and so on. The + * initialValue, which is effectively at the end of the list, is a nextCall + * function that invokes getBottomInterceptingCall, which handles + * (de)serialization and also gets the underlying call from the channel */ + const getCall: NextCall = interceptors.reduceRight((previousValue: NextCall, currentValue: Interceptor) => { + return currentOptions => currentValue(currentOptions, previousValue); + }, (finalOptions: InterceptorOptions) => getBottomInterceptingCall(channel, methodDefinition.path, finalOptions, methodDefinition)); + return getCall(interceptorOptions); +} \ No newline at end of file diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index dc9e2eeb..a7fbfa55 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -35,8 +35,12 @@ import { ChannelCredentials } from './channel-credentials'; import { ChannelOptions } from './channel-options'; import { Status } from './constants'; import { Metadata } from './metadata'; +import { ClientMethodDefinition } from './make-client'; +import { getInterceptingCall, Interceptor, InterceptorProvider, InterceptorArguments } from './client-interceptors'; const CHANNEL_SYMBOL = Symbol(); +const INTERCEPTOR_SYMBOL = Symbol(); +const INTERCEPTOR_PROVIDER_SYMBOL = Symbol(); export interface UnaryCallback { (err: ServiceError | null, value?: ResponseType): void; @@ -49,6 +53,8 @@ export interface CallOptions { * but the server is not yet implemented so it makes no sense to have it */ propagate_flags?: number; credentials?: CallCredentials; + interceptors?: Interceptor[], + interceptor_providers?: InterceptorProvider[] } export type ClientOptions = Partial & { @@ -58,6 +64,8 @@ export type ClientOptions = Partial & { credentials: ChannelCredentials, options: ClientOptions ) => Channel; + interceptors?: Interceptor[], + interceptor_providers?: InterceptorProvider[] }; /** @@ -66,6 +74,8 @@ export type ClientOptions = Partial & { */ export class Client { private readonly [CHANNEL_SYMBOL]: Channel; + private readonly [INTERCEPTOR_SYMBOL]: Interceptor[]; + private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[]; constructor( address: string, credentials: ChannelCredentials, @@ -86,6 +96,13 @@ export class Client { options ); } + this[INTERCEPTOR_SYMBOL] = options.interceptors || []; + this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers || []; + if (this[INTERCEPTOR_SYMBOL].length > 0 && this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0) { + throw new Error( + 'Both interceptors and interceptor_providers were passed as options ' + + 'to the client constructor. Only one of these is allowed.'); + } } close(): void { @@ -201,36 +218,35 @@ export class Client { ({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments< ResponseType >(metadata, options, callback)); - const call: Call = this[CHANNEL_SYMBOL].createCall( - method, - options.deadline, - options.host, - null, - options.propagate_flags - ); + const methodDefinition: ClientMethodDefinition = { + path: method, + requestStream: false, + responseStream: false, + requestSerialize: serialize, + responseDeserialize: deserialize + }; + const interceptorArgs: InterceptorArguments = { + clientInterceptors: this[INTERCEPTOR_SYMBOL], + clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], + callInterceptors: options.interceptors || [], + callInterceptorProviders: options.interceptor_providers || [] + }; + const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); if (options.credentials) { call.setCredentials(options.credentials); } - const message: Buffer = serialize(argument); - const writeObj: WriteObject = { message }; + const writeObj: WriteObject = { message: argument }; const emitter = new ClientUnaryCallImpl(call); let responseMessage: ResponseType | null = null; call.start(metadata, { onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); }, - onReceiveMessage(message: Buffer) { + onReceiveMessage(message: any) { if (responseMessage != null) { call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); } - try { - responseMessage = deserialize(message); - } catch (e) { - call.cancelWithStatus( - Status.INTERNAL, - 'Failed to parse server response' - ); - } + responseMessage = message; call.startRead(); }, onReceiveStatus(status: StatusObject) { @@ -286,13 +302,20 @@ export class Client { ({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments< ResponseType >(metadata, options, callback)); - const call: Call = this[CHANNEL_SYMBOL].createCall( - method, - options.deadline, - options.host, - null, - options.propagate_flags - ); + const methodDefinition: ClientMethodDefinition = { + path: method, + requestStream: true, + responseStream: false, + requestSerialize: serialize, + responseDeserialize: deserialize + }; + const interceptorArgs: InterceptorArguments = { + clientInterceptors: this[INTERCEPTOR_SYMBOL], + clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], + callInterceptors: options.interceptors || [], + callInterceptorProviders: options.interceptor_providers || [] + }; + const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); if (options.credentials) { call.setCredentials(options.credentials); } @@ -302,18 +325,11 @@ export class Client { onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); }, - onReceiveMessage(message: Buffer) { + onReceiveMessage(message: any) { if (responseMessage != null) { call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); } - try { - responseMessage = deserialize(message); - } catch (e) { - call.cancelWithStatus( - Status.INTERNAL, - 'Failed to parse server response' - ); - } + responseMessage = message; call.startRead(); }, onReceiveStatus(status: StatusObject) { @@ -377,32 +393,31 @@ export class Client { options?: CallOptions ): ClientReadableStream { ({ metadata, options } = this.checkMetadataAndOptions(metadata, options)); - const call: Call = this[CHANNEL_SYMBOL].createCall( - method, - options.deadline, - options.host, - null, - options.propagate_flags - ); + const methodDefinition: ClientMethodDefinition = { + path: method, + requestStream: false, + responseStream: true, + requestSerialize: serialize, + responseDeserialize: deserialize + }; + const interceptorArgs: InterceptorArguments = { + clientInterceptors: this[INTERCEPTOR_SYMBOL], + clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], + callInterceptors: options.interceptors || [], + callInterceptorProviders: options.interceptor_providers || [] + }; + const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); if (options.credentials) { call.setCredentials(options.credentials); } - const message: Buffer = serialize(argument); - const writeObj: WriteObject = { message }; + const writeObj: WriteObject = { message: argument }; const stream = new ClientReadableStreamImpl(call, deserialize); call.start(metadata, { onReceiveMetadata(metadata: Metadata) { stream.emit('metadata', metadata); }, - onReceiveMessage(message: Buffer) { - let deserialized: ResponseType; - try { - deserialized = deserialize(message); - } catch (e) { - call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response'); - return; - } - if (stream.push(deserialized)) { + onReceiveMessage(message: any) { + if (stream.push(message)) { call.startRead(); } }, @@ -439,13 +454,20 @@ export class Client { options?: CallOptions ): ClientDuplexStream { ({ metadata, options } = this.checkMetadataAndOptions(metadata, options)); - const call: Call = this[CHANNEL_SYMBOL].createCall( - method, - options.deadline, - options.host, - null, - options.propagate_flags - ); + const methodDefinition: ClientMethodDefinition = { + path: method, + requestStream: true, + responseStream: true, + requestSerialize: serialize, + responseDeserialize: deserialize + }; + const interceptorArgs: InterceptorArguments = { + clientInterceptors: this[INTERCEPTOR_SYMBOL], + clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], + callInterceptors: options.interceptors || [], + callInterceptorProviders: options.interceptor_providers || [] + }; + const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]); if (options.credentials) { call.setCredentials(options.credentials); } @@ -459,14 +481,7 @@ export class Client { stream.emit('metadata', metadata); }, onReceiveMessage(message: Buffer) { - let deserialized: ResponseType; - try { - deserialized = deserialize(message); - } catch (e) { - call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response'); - return; - } - if (stream.push(deserialized)) { + if (stream.push(message)) { call.startRead(); } }, diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index b9406fdb..271569d8 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -252,19 +252,6 @@ export type Call = | ClientDuplexStream; /* tslint:enable:no-any */ -export type MetadataListener = (metadata: Metadata, next: Function) => void; - -// tslint:disable-next-line:no-any -export type MessageListener = (message: any, next: Function) => void; - -export type StatusListener = (status: StatusObject, next: Function) => void; - -export interface Listener { - onReceiveMetadata?: MetadataListener; - onReceiveMessage?: MessageListener; - onReceiveStatus?: StatusListener; -} - /**** Unimplemented function stubs ****/ /* tslint:disable:no-any variable-name */ @@ -299,17 +286,16 @@ export const getClientChannel = (client: Client) => { export { StatusBuilder }; -export const ListenerBuilder = () => { - throw new Error('Not yet implemented'); -}; +export { Listener } from './call-stream'; -export const InterceptorBuilder = () => { - throw new Error('Not yet implemented'); -}; - -export const InterceptingCall = () => { - throw new Error('Not yet implemented'); -}; +export { + Requester, + ListenerBuilder, + RequesterBuilder, + Interceptor, + InterceptorProvider, + InterceptingCall, + InterceptorConfigurationError } from './client-interceptors'; export { GrpcObject } from './make-client'; diff --git a/packages/grpc-js/src/make-client.ts b/packages/grpc-js/src/make-client.ts index fc44c674..9f0bd76a 100644 --- a/packages/grpc-js/src/make-client.ts +++ b/packages/grpc-js/src/make-client.ts @@ -27,17 +27,27 @@ export interface Deserialize { (bytes: Buffer): T; } -export interface MethodDefinition { +export interface ClientMethodDefinition { path: string; requestStream: boolean; responseStream: boolean; requestSerialize: Serialize; - responseSerialize: Serialize; - requestDeserialize: Deserialize; responseDeserialize: Deserialize; originalName?: string; } +export interface ServerMethodDefinition { + path: string; + requestStream: boolean; + responseStream: boolean; + responseSerialize: Serialize; + requestDeserialize: Deserialize; + originalName?: string; +} + +export interface MethodDefinition extends ClientMethodDefinition, ServerMethodDefinition { +} + export interface ServiceDefinition { [index: string]: MethodDefinition; } diff --git a/packages/grpc-native-core/deps/grpc b/packages/grpc-native-core/deps/grpc index f703e1c8..e6224f8f 160000 --- a/packages/grpc-native-core/deps/grpc +++ b/packages/grpc-native-core/deps/grpc @@ -1 +1 @@ -Subproject commit f703e1c86c1504d9e48953f8da31f842679b7775 +Subproject commit e6224f8fd9e4903238cebf22b9d78d09e52c378c diff --git a/test/api/client_interceptors_test.js b/test/api/client_interceptors_test.js new file mode 100644 index 00000000..9922ca97 --- /dev/null +++ b/test/api/client_interceptors_test.js @@ -0,0 +1,1778 @@ +/** + * @license + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +'use strict'; + +const options = { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true +}; +var _ = require('lodash'); +var assert = require('assert'); +const anyGrpc = require('../any_grpc'); +const clientGrpc = anyGrpc.client +const serverGrpc = anyGrpc.server; +const protoLoader = require('../../packages/proto-loader'); + +var insecureCreds = clientGrpc.credentials.createInsecure(); + +const echoProtoDef = protoLoader.loadSync(__dirname + '/../proto/echo_service.proto', options); +const EchoClient = clientGrpc.loadPackageDefinition(echoProtoDef).EchoService; +const echo_service = echoProtoDef.EchoService; + +var StatusBuilder = clientGrpc.StatusBuilder; +var ListenerBuilder = clientGrpc.ListenerBuilder; +var InterceptingCall = clientGrpc.InterceptingCall; +var RequesterBuilder = clientGrpc.RequesterBuilder; +const Metadata = clientGrpc.Metadata; + +var CallRegistry = function(done, expectation, is_ordered, is_verbose) { + this.call_map = {}; + this.call_array = []; + this.done = done; + this.expectation = expectation; + this.expectation_is_array = Array.isArray(this.expectation); + this.is_ordered = is_ordered; + this.is_verbose = is_verbose; + if (is_verbose) { + console.log('Expectation: ', expectation); + } +}; + +CallRegistry.prototype.addCall = function(call_name) { + if (this.expectation_is_array) { + this.call_array.push(call_name); + if (this.is_verbose) { + console.log(this.call_array); + } + } else { + if (!this.call_map[call_name]) { + this.call_map[call_name] = 0; + } + this.call_map[call_name]++; + if (this.is_verbose) { + console.log(this.call_map); + } + } + this.maybeCallDone(); +}; + +CallRegistry.prototype.maybeCallDone = function() { + if (this.expectation_is_array) { + if (this.is_ordered) { + if (this.expectation && _.isEqual(this.expectation, this.call_array)) { + this.done(); + } + } else { + var intersection = _.intersectionWith(this.expectation, this.call_array, + _.isEqual); + if (intersection.length === this.expectation.length) { + this.done(); + } + } + } else if (this.expectation && _.isEqual(this.expectation, this.call_map)) { + this.done(); + } +}; + +describe('Client interceptors', function() { + var echo_server; + var echo_port; + var client; + + function startServer() { + echo_server = new serverGrpc.Server(); + echo_server.addService(echo_service, { + echo: function(call, callback) { + call.sendMetadata(call.metadata); + if (call.request.value === 'error') { + var status = { + code: 2, + message: 'test status message' + }; + status.metadata = call.metadata; + callback(status, null); + return; + } + callback(null, call.request); + }, + echoClientStream: function(call, callback){ + call.sendMetadata(call.metadata); + var payload; + var err = null; + call.on('data', function(data) { + if (data.value === 'error') { + err = { + code: 2, + message: 'test status message' + }; + err.metadata = call.metadata; + return; + } + payload = data; + }); + call.on('end', function() { + callback(err, payload, call.metadata); + }); + }, + echoServerStream: function(call) { + call.sendMetadata(call.metadata); + if (call.request.value === 'error') { + var status = { + code: 2, + message: 'test status message' + }; + status.metadata = call.metadata; + call.emit('error', status); + return; + } + call.write(call.request); + call.end(call.metadata); + }, + echoBidiStream: function(call) { + call.sendMetadata(call.metadata); + call.on('data', function(data) { + if (data.value === 'error') { + var status = { + code: 2, + message: 'test status message' + }; + call.emit('error', status); + return; + } + call.write(data); + }); + call.on('end', function() { + call.end(call.metadata); + }); + } + }); + var server_credentials = serverGrpc.ServerCredentials.createInsecure(); + echo_port = echo_server.bind('localhost:0', server_credentials); + echo_server.start(); + } + + function stopServer() { + echo_server.forceShutdown(); + } + + function resetClient() { + client = new EchoClient('localhost:' + echo_port, insecureCreds); + } + + before(function() { + startServer(); + }); + beforeEach(function() { + resetClient(); + }); + after(function() { + stopServer(); + }); + + describe('execute downstream interceptors when a new call is made outbound', + function() { + var registry; + var options; + before(function() { + var stored_listener; + var stored_metadata; + var interceptor_a = function (options, nextCall) { + options.call_number = 1; + registry.addCall('construct a ' + options.call_number); + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + registry.addCall('start a ' + options.call_number); + stored_listener = listener; + stored_metadata = metadata; + next(metadata, listener); + }, + sendMessage: function (message, next) { + registry.addCall('send a ' + options.call_number); + var options2 = _.clone(options); + options2.call_number = 2; + var second_call = nextCall(options2); + second_call.start(stored_metadata); + second_call.sendMessage(message); + second_call.halfClose(); + next(message); + }, + halfClose: function (next) { + registry.addCall('close a ' + options.call_number); + next(); + } + }); + }; + + var interceptor_b = function (options, nextCall) { + registry.addCall('construct b ' + options.call_number); + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + registry.addCall('start b ' + options.call_number); + next(metadata, listener); + }, + sendMessage: function (message, next) { + registry.addCall('send b ' + options.call_number); + next(message); + }, + halfClose: function (next) { + registry.addCall('close b ' + options.call_number); + next(); + } + }); + }; + options = { + interceptors: [interceptor_a, interceptor_b] + }; + }); + var expected_calls = [ + 'construct a 1', + 'construct b 1', + 'start a 1', + 'start b 1', + 'send a 1', + 'construct b 2', + 'start b 2', + 'send b 2', + 'close b 2', + 'send b 1', + 'close a 1', + 'close b 1', + 'response' + ]; + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + client.echo(message, options, function(err, response){ + if (!err) { + registry.addCall('response'); + } + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, false); + var message = {}; + message.value = 'foo'; + var stream = client.echoClientStream(options, function(err, response) { + if (!err) { + registry.addCall('response'); + } + }); + stream.write(message); + stream.end(); + }); + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + var stream = client.echoServerStream(message, options); + stream.on('data', function(data) { + registry.addCall('response'); + }); + }); + it('with bidi streaming call', function(done) { + registry = new CallRegistry( done, expected_calls, true); + var message = {}; + message.value = 'foo'; + var stream = client.echoBidiStream(options); + stream.on('data', function(data) { + registry.addCall('response'); + }); + stream.write(message); + stream.end(); + }); + }); + + + describe('execute downstream interceptors when a new call is made inbound', + function() { + var registry; + var options; + before(function() { + var interceptor_a = function (options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + next(metadata, { + onReceiveMetadata: function () { }, + onReceiveMessage: function (message, next) { + registry.addCall('interceptor_a'); + var second_call = nextCall(options); + second_call.start(metadata, listener); + second_call.sendMessage(message); + second_call.halfClose(); + }, + onReceiveStatus: function () { } + }); + } + }); + }; + + var interceptor_b = function (options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + next(metadata, { + onReceiveMessage: function (message, next) { + registry.addCall('interceptor_b'); + next(message); + } + }); + } + }); + }; + + options = { + interceptors: [interceptor_a, interceptor_b] + }; + + }); + var expected_calls = ['interceptor_b', 'interceptor_a', + 'interceptor_b', 'response']; + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + client.echo(message, options, function(err) { + if (!err) { + registry.addCall('response'); + } + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + var stream = client.echoClientStream(options, function(err, response) { + if (!err) { + registry.addCall('response'); + } + }); + stream.write(message); + stream.end(); + }); + }); + + it.skip('will delay operations and short circuit unary requests', function(done) { + var registry = new CallRegistry(done, ['foo_miss', 'foo_hit', 'bar_miss', + 'foo_hit_done', 'foo_miss_done', 'bar_miss_done']); + var cache = {}; + var _getCachedResponse = function(value) { + return cache[value]; + }; + var _store = function(key, value) { + cache[key] = value; + }; + + var interceptor = function(options, nextCall) { + var savedMetadata; + var startNext; + var storedListener; + var storedMessage; + var messageNext; + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + savedMetadata = metadata; + storedListener = listener; + startNext = next; + }) + .withSendMessage(function(message, next) { + storedMessage = message; + messageNext = next; + }) + .withHalfClose(function(next) { + var cachedValue = _getCachedResponse(storedMessage.value); + if (cachedValue) { + var cachedMessage = {}; + cachedMessage.value = cachedValue; + registry.addCall(storedMessage.value + '_hit'); + storedListener.onReceiveMetadata(new Metadata()); + storedListener.onReceiveMessage(cachedMessage); + storedListener.onReceiveStatus( + (new StatusBuilder()).withCode(clientGrpc.status.OK).build()); + } else { + registry.addCall(storedMessage.value + '_miss'); + var newListener = (new ListenerBuilder()).withOnReceiveMessage( + function(message, next) { + _store(storedMessage.value, message.value); + next(message); + }).build(); + startNext(savedMetadata, newListener); + messageNext(storedMessage); + next(); + } + }) + .withCancel(function(message, next) { + next(); + }).build(); + + return new InterceptingCall(nextCall(options), requester); + }; + + var options = { + interceptors: [interceptor] + }; + + var foo_message = {}; + foo_message.value = 'foo'; + client.echo(foo_message, options, function(err, response){ + assert.equal(response.value, 'foo'); + registry.addCall('foo_miss_done'); + client.echo(foo_message, options, function(err, response){ + assert.equal(response.value, 'foo'); + registry.addCall('foo_hit_done'); + }); + }); + + var bar_message = {}; + bar_message.value = 'bar'; + client.echo(bar_message, options, function(err, response) { + assert.equal(response.value, 'bar'); + registry.addCall('bar_miss_done'); + }); + }); + + it('can retry failed messages and handle eventual success', function(done) { + var registry = new CallRegistry(done, + ['retry_foo_1', 'retry_foo_2', 'retry_foo_3', 'foo_result', + 'retry_bar_1', 'bar_result']); + var maxRetries = 3; + var retry_interceptor = function(options, nextCall) { + var savedMetadata; + var savedSendMessage; + var savedReceiveMessage; + var savedMessageNext; + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + savedMetadata = metadata; + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + savedReceiveMessage = message; + savedMessageNext = next; + }) + .withOnReceiveStatus(function(status, next) { + var retries = 0; + var retry = function(message, metadata) { + retries++; + var newCall = nextCall(options); + var receivedMessage; + newCall.start(metadata, { + onReceiveMessage: function(message) { + receivedMessage = message; + }, + onReceiveStatus: function(status) { + registry.addCall('retry_' + savedMetadata.get('name') + + '_' + retries); + if (status.code !== clientGrpc.status.OK) { + if (retries <= maxRetries) { + retry(message, metadata); + } else { + savedMessageNext(receivedMessage); + next(status); + } + } else { + registry.addCall('success_call'); + var new_status = (new StatusBuilder()) + .withCode(clientGrpc.status.OK).build(); + savedMessageNext(receivedMessage); + next(new_status); + } + } + }); + newCall.sendMessage(message); + newCall.halfClose(); + }; + if (status.code !== clientGrpc.status.OK) { + // Change the message we're sending only for test purposes + // so the server will respond without error + var newMessage = (savedMetadata.get('name')[0] === 'bar') ? + {value: 'bar'} : savedSendMessage; + retry(newMessage, savedMetadata); + } else { + savedMessageNext(savedReceiveMessage); + next(status); + } + } + ).build(); + next(metadata, new_listener); + }) + .withSendMessage(function(message, next) { + savedSendMessage = message; + next(message); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var options = { + interceptors: [retry_interceptor] + }; + + // Make a call which the server will return a non-OK status for + var foo_message = {value: 'error'}; + var foo_metadata = new Metadata(); + foo_metadata.set('name', 'foo'); + client.echo(foo_message, foo_metadata, options, function(err, response) { + assert.strictEqual(err.code, 2); + registry.addCall('foo_result'); + }); + + // Make a call which will fail the first time and succeed on the first + // retry + var bar_message = {value: 'error'}; + var bar_metadata = new Metadata(); + bar_metadata.set('name', 'bar'); + client.echo(bar_message, bar_metadata, options, function(err, response) { + assert.strictEqual(response.value, 'bar'); + registry.addCall('bar_result'); + }); + }); + + it('can retry and preserve interceptor order on success', function(done) { + var registry = new CallRegistry(done, + ['interceptor_c', 'retry_interceptor', 'fail_call', 'interceptor_c', + 'success_call', 'interceptor_a', 'result'], true); + var interceptor_a = function(options, nextCall) { + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + registry.addCall('interceptor_a'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var retry_interceptor = function(options, nextCall) { + var savedMetadata; + var savedMessage; + var savedMessageNext; + var sendMessageNext; + var originalMessage; + var startNext; + var originalListener; + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + startNext = next; + savedMetadata = metadata; + originalListener = listener; + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + savedMessage = message; + savedMessageNext = next; + }) + .withOnReceiveStatus(function(status, next) { + var retries = 0; + var maxRetries = 1; + var receivedMessage; + var retry = function(message, metadata) { + retries++; + var new_call = nextCall(options); + new_call.start(metadata, { + onReceiveMessage: function(message) { + receivedMessage = message; + }, + onReceiveStatus: function(status) { + if (status.code !== clientGrpc.status.OK) { + if (retries <= maxRetries) { + retry(message, metadata); + } else { + savedMessageNext(receivedMessage); + next(status); + } + } else { + registry.addCall('success_call'); + var new_status = (new StatusBuilder()) + .withCode(clientGrpc.status.OK).build(); + savedMessageNext(receivedMessage); + next(new_status); + } + } + }); + new_call.sendMessage(message); + new_call.halfClose(); + }; + registry.addCall('retry_interceptor'); + if (status.code !== clientGrpc.status.OK) { + registry.addCall('fail_call'); + var newMessage = {value: 'foo'}; + retry(newMessage, savedMetadata); + } else { + savedMessageNext(savedMessage); + next(status); + } + }).build(); + next(metadata, new_listener); + }) + .withSendMessage(function(message, next) { + sendMessageNext = next; + originalMessage = message; + next(message); + }) + .build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var interceptor_c = function(options, nextCall) { + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + registry.addCall('interceptor_c'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var options = { + interceptors: [interceptor_a, retry_interceptor, interceptor_c] + }; + + var message = {value: 'error'}; + client.echo(message, options, function(err, response) { + assert.strictEqual(response.value, 'foo'); + registry.addCall('result'); + }); + }); + + describe('handle interceptor errors', function (doneOuter) { + var options; + before(function () { + var foo_interceptor = function (options, nextCall) { + var savedListener; + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + savedListener = listener; + next(metadata, listener); + }) + .withSendMessage(function (message, next) { + savedListener.onReceiveMetadata(new Metadata()); + savedListener.onReceiveMessage({ value: 'failed' }); + var error_status = (new StatusBuilder()) + .withCode(16) + .withDetails('Error in foo interceptor') + .build(); + savedListener.onReceiveStatus(error_status); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + it('with unary call', function(done) { + var message = {}; + client.echo(message, options, function(err, response) { + assert.strictEqual(err.code, 16); + assert.strictEqual(err.message, + '16 UNAUTHENTICATED: Error in foo interceptor'); + done(); + doneOuter(); + }); + }); + }); + + describe('implement fallbacks for streaming RPCs', function() { + + var options; + before(function () { + var fallback_response = { value: 'fallback' }; + var savedMessage; + var savedMessageNext; + var interceptor = function (options, nextCall) { + var requester = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function (message, next) { + savedMessage = message; + savedMessageNext = next; + }) + .withOnReceiveStatus(function (status, next) { + if (status.code !== clientGrpc.status.OK) { + savedMessageNext(fallback_response); + next((new StatusBuilder()).withCode(clientGrpc.status.OK)); + } else { + savedMessageNext(savedMessage); + next(status); + } + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + options = { + interceptors: [interceptor] + }; + }); + it('with client streaming call', function (done) { + var registry = new CallRegistry(done, ['foo_result', 'fallback_result']); + var stream = client.echoClientStream(options, function (err, response) { + assert.strictEqual(response.value, 'foo'); + registry.addCall('foo_result'); + }); + stream.write({ value: 'foo' }); + stream.end(); + + stream = client.echoClientStream(options, function(err, response) { + assert.ifError(err); + assert.strictEqual(response.value, 'fallback'); + registry.addCall('fallback_result'); + }); + stream.write({value: 'error'}); + stream.end(); + }); + }); + + describe('allows the call options to be modified for downstream interceptors', + function() { + var done; + var options; + var method_name; + var method_path_last; + before(function() { + var interceptor_a = function (options, nextCall) { + options.deadline = 10; + return new InterceptingCall(nextCall(options)); + }; + var interceptor_b = function (options, nextCall) { + assert.equal(options.method_definition.path, '/EchoService/' + + method_path_last); + assert.equal(options.deadline, 10); + done(); + return new InterceptingCall(nextCall(options)); + }; + + options = { + interceptors: [interceptor_a, interceptor_b], + deadline: 100 + }; + }); + + it('with unary call', function(cb) { + done = cb; + var metadata = new Metadata(); + var message = {}; + method_name = 'echo'; + method_path_last = 'Echo'; + + client.echo(message, metadata, options, function(){}); + }); + + it('with client streaming call', function(cb) { + done = cb; + var metadata = new Metadata(); + method_name = 'echoClientStream'; + method_path_last = 'EchoClientStream'; + + client.echoClientStream(metadata, options, function() {}); + }); + + it('with server streaming call', function(cb) { + done = cb; + var metadata = new Metadata(); + var message = {}; + method_name = 'echoServerStream'; + method_path_last = 'EchoServerStream'; + + client.echoServerStream(message, metadata, options); + }); + + it('with bidi streaming call', function(cb) { + done = cb; + var metadata = new Metadata(); + method_name = 'echoBidiStream'; + method_path_last = 'EchoBidiStream'; + + client.echoBidiStream(metadata, options); + }); + }); + + describe('pass accurate MethodDefinitions', function() { + var registry; + var initial_value = 'broken'; + var expected_value = 'working'; + var options; + before(function() { + var interceptor = function (options, nextCall) { + registry.addCall({ + path: options.method_definition.path, + requestStream: options.method_definition.requestStream, + responseStream: options.method_definition.responseStream + }); + var outbound = (new RequesterBuilder()) + .withSendMessage(function (message, next) { + message.value = expected_value; + next(message); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [interceptor] }; + }); + + it('with unary call', function(done) { + var unary_definition = { + path: '/EchoService/Echo', + requestStream: false, + responseStream: false + }; + registry = new CallRegistry(done, [ + unary_definition, + 'result_unary' + ]); + + var metadata = new Metadata(); + + var message = {value: initial_value}; + + client.echo(message, metadata, options, function(err, response){ + assert.equal(response.value, expected_value); + registry.addCall('result_unary'); + }); + + }); + it('with client streaming call', function(done) { + + var client_stream_definition = { + path: '/EchoService/EchoClientStream', + requestStream: true, + responseStream: false + }; + registry = new CallRegistry(done, [ + client_stream_definition, + 'result_client_stream' + ], false, true); + var metadata = new Metadata(); + var message = {value: initial_value}; + var client_stream = client.echoClientStream(metadata, options, + function(err, response) { + assert.strictEqual(response.value, expected_value); + registry.addCall('result_client_stream'); + }); + client_stream.write(message); + client_stream.end(); + + }); + it('with server streaming call', function(done) { + var server_stream_definition = { + path: '/EchoService/EchoServerStream', + responseStream: true, + requestStream: false, + }; + registry = new CallRegistry(done, [ + server_stream_definition, + 'result_server_stream' + ]); + + var metadata = new Metadata(); + var message = {value: initial_value}; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('data', function(data) { + assert.strictEqual(data.value, expected_value); + registry.addCall('result_server_stream'); + }); + + }); + it('with bidi streaming call', function(done) { + var bidi_stream_definition = { + path: '/EchoService/EchoBidiStream', + requestStream: true, + responseStream: true + }; + registry = new CallRegistry(done, [ + bidi_stream_definition, + 'result_bidi_stream' + ]); + + var metadata = new Metadata(); + var message = {value: initial_value}; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(data) { + assert.strictEqual(data.value, expected_value); + registry.addCall('result_bidi_stream'); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + it('uses interceptors passed to the client constructor', function(done) { + var registry = new CallRegistry(done, { + 'constructor_interceptor_a_echo': 1, + 'constructor_interceptor_b_echoServerStream': 1, + 'invocation_interceptor': 1, + 'result_unary': 1, + 'result_stream': 1, + 'result_invocation': 1 + }); + + var constructor_interceptor_a = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + registry.addCall('constructor_interceptor_a_echo'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var constructor_interceptor_b = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + registry.addCall('constructor_interceptor_b_echoServerStream'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var invocation_interceptor = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + registry.addCall('invocation_interceptor'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + + var interceptor_providers = [ + function(method_definition) { + if (!method_definition.requestStream && + !method_definition.responseStream) { + return constructor_interceptor_a; + } + }, + function(method_definition) { + if (!method_definition.requestStream && + method_definition.responseStream) { + return constructor_interceptor_b; + } + } + ]; + var constructor_options = { + interceptor_providers: interceptor_providers + }; + var int_client = new EchoClient('localhost:' + echo_port, insecureCreds, + constructor_options); + var message = {}; + int_client.echo(message, function() { + registry.addCall('result_unary'); + }); + var stream = int_client.echoServerStream(message); + stream.on('data', function() { + registry.addCall('result_stream'); + }); + + var options = { interceptors: [invocation_interceptor] }; + int_client.echo(message, options, function() { + registry.addCall('result_invocation'); + }); + }); + + it.skip('will reject conflicting interceptor options at invocation', + function(done) { + try { + client.echo('message', { + interceptors: [], + interceptor_providers: [] + }, function () {}); + } catch (e) { + assert.equal(e.name, 'InterceptorConfigurationError'); + done(); + } + }); + + it('will resolve interceptor providers at invocation', function(done) { + var constructor_interceptor = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function() { + assert(false); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var invocation_interceptor = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function() { + done(); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var constructor_interceptor_providers = [ + function() { + return constructor_interceptor; + } + ]; + var invocation_interceptor_providers = [ + function() { + return invocation_interceptor; + } + ]; + var constructor_options = { + interceptor_providers: constructor_interceptor_providers + }; + var int_client = new EchoClient('localhost:' + echo_port, insecureCreds, + constructor_options); + var message = {}; + var options = { interceptor_providers: invocation_interceptor_providers }; + int_client.echo(message, options, function() {}); + }); + + describe('trigger a stack of interceptors in nested order', function() { + var registry; + var expected_calls = ['constructA', 'constructB', 'outboundA', 'outboundB', + 'inboundB', 'inboundA']; + var options; + before(function() { + var interceptor_a = function (options, nextCall) { + registry.addCall('constructA'); + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('outboundA'); + var new_listener = (new ListenerBuilder()).withOnReceiveMessage( + function (message, next) { + registry.addCall('inboundA'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new clientGrpc.InterceptingCall(nextCall(options), + outbound); + }; + var interceptor_b = function (options, nextCall) { + registry.addCall('constructB'); + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('outboundB'); + var new_listener = (new ListenerBuilder()).withOnReceiveMessage( + function (message, next) { + registry.addCall('inboundB'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [interceptor_a, interceptor_b] }; + }); + var metadata = new Metadata(); + var message = {}; + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + client.echo(message, metadata, options, function(){}); + }); + + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var client_stream = client.echoClientStream(metadata, options, + function() {}); + client_stream.write(message); + client_stream.end(); + }); + + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var stream = client.echoServerStream(message, metadata, options); + stream.on('data', function() {}); + }); + + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(){}); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger interceptors horizontally', function() { + var expected_calls = [ + 'interceptor_a_start', + 'interceptor_b_start', + 'interceptor_a_send', + 'interceptor_b_send' + ]; + var registry; + var options; + var metadata = new Metadata(); + var message = {}; + + before(function() { + var interceptor_a = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('interceptor_a_start'); + next(metadata, listener); + }) + .withSendMessage(function (message, next) { + registry.addCall('interceptor_a_send'); + next(message); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + var interceptor_b = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('interceptor_b_start'); + next(metadata, listener); + }) + .withSendMessage(function (message, next) { + registry.addCall('interceptor_b_send'); + next(message); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [interceptor_a, interceptor_b] }; + }); + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + client.echo(message, metadata, options, function(){}); + }); + + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var client_stream = client.echoClientStream(metadata, options, + function() {}); + client_stream.write(message); + client_stream.end(); + }); + + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var stream = client.echoServerStream(message, metadata, options); + stream.on('data', function() {}); + }); + + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(){}); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when sending metadata', function() { + var registry; + + var message = {}; + var key_names = ['original', 'foo', 'bar']; + var keys = { + original: 'originalkey', + foo: 'fookey', + bar: 'barkey' + }; + var values = { + original: 'originalvalue', + foo: 'foovalue', + bar: 'barvalue' + }; + var expected_calls = ['foo', 'bar', 'response']; + var options; + before(function () { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + metadata.add(keys.foo, values.foo); + registry.addCall('foo'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var bar_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + metadata.add(keys.bar, values.bar); + registry.addCall('bar'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [foo_interceptor, bar_interceptor] }; + }); + + it('with unary call', function (done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + + var unary_call = client.echo(message, metadata, options, function () {}); + unary_call.on('metadata', function (metadata) { + var has_expected_values = _.every(key_names, function (key_name) { + return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); + }); + assert(has_expected_values); + registry.addCall('response'); + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + + var client_stream = client.echoClientStream(metadata, options, + function () { + }); + client_stream.write(message); + client_stream.on('metadata', function (metadata) { + var has_expected_values = _.every(key_names, function (key_name) { + return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); + }); + assert(has_expected_values); + registry.addCall('response'); + }); + client_stream.end(); + }); + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('metadata', function (metadata) { + var has_expected_values = _.every(key_names, function (key_name) { + return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); + }); + assert(has_expected_values); + registry.addCall('response'); + }); + server_stream.on('data', function() { }); + }); + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('metadata', function(metadata) { + var has_expected_values = _.every(key_names, function(key_name) { + return _.isEqual(metadata.get(keys[key_name]),[values[key_name]]); + }); + assert(has_expected_values); + bidi_stream.end(); + registry.addCall('response'); + }); + bidi_stream.on('data', function() { }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when sending messages', function() { + var registry; + var originalValue = 'foo'; + var expectedValue = 'bar'; + var options; + var metadata = new Metadata(); + var expected_calls = ['messageIntercepted', 'response']; + + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withSendMessage(function (message, next) { + assert.strictEqual(message.value, originalValue); + registry.addCall('messageIntercepted'); + next({ value: expectedValue }); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + + client.echo(message, metadata, options, function (err, response) { + assert.strictEqual(response.value, expectedValue); + registry.addCall('response'); + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + var client_stream = client.echoClientStream(metadata, options, + function (err, response) { + assert.strictEqual(response.value, expectedValue); + registry.addCall('response'); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('data', function (data) { + assert.strictEqual(data.value, expectedValue); + registry.addCall('response'); + }); + }); + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(data) { + assert.strictEqual(data.value, expectedValue); + registry.addCall('response'); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when client closes the call', function() { + var registry; + var expected_calls = [ + 'response', 'halfClose' + ]; + var message = {}; + var options; + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withHalfClose(function (next) { + registry.addCall('halfClose'); + next(); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + it('with unary call', function (done) { + registry = new CallRegistry(done, expected_calls); + client.echo(message, options, function (err, response) { + if (!err) { + registry.addCall('response'); + } + }); + }); + it('with client streaming call', function (done) { + registry = new CallRegistry(done, expected_calls); + var client_stream = client.echoClientStream(options, + function (err, response) { }); + client_stream.write(message, function (err) { + if (!err) { + registry.addCall('response'); + } + }); + client_stream.end(); + }); + it('with server streaming call', function (done) { + registry = new CallRegistry(done, expected_calls); + var server_stream = client.echoServerStream(message, options); + server_stream.on('data', function (data) { + registry.addCall('response'); + }); + }); + it('with bidi streaming call', function (done) { + registry = new CallRegistry(done, expected_calls); + var bidi_stream = client.echoBidiStream(options); + bidi_stream.on('data', function (data) { + registry.addCall('response'); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when the stream is canceled', function() { + var done; + var message = {}; + var options; + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withCancel(function (next) { + done(); + next(); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function(cb) { + done = cb; + var stream = client.echo(message, options, function() {}); + stream.cancel(); + }); + + it('with client streaming call', function(cb) { + done = cb; + var stream = client.echoClientStream(options, function() {}); + stream.cancel(); + }); + it('with server streaming call', function(cb) { + done = cb; + var stream = client.echoServerStream(message, options); + stream.cancel(); + }); + it('with bidi streaming call', function(cb) { + done = cb; + var stream = client.echoBidiStream(options); + stream.cancel(); + }); + }); + + describe('trigger when receiving metadata', function() { + var message = {}; + var expectedKey = 'foo'; + var expectedValue = 'bar'; + var options; + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()).withOnReceiveMetadata( + function (metadata, next) { + metadata.add(expectedKey, expectedValue); + next(metadata); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function(done) { + var metadata = new Metadata(); + var unary_call = client.echo(message, metadata, options, function () {}); + unary_call.on('metadata', function (metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + done(); + }); + }); + it('with client streaming call', function(done) { + var metadata = new Metadata(); + var client_stream = client.echoClientStream(metadata, options, + function () {}); + client_stream.write(message); + client_stream.on('metadata', function (metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + done(); + }); + client_stream.end(); + }); + it('with server streaming call', function(done) { + var metadata = new Metadata(); + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('metadata', function (metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + done(); + }); + server_stream.on('data', function() { }); + }); + it('with bidi streaming call', function(done) { + var metadata = new Metadata(); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('metadata', function(metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + bidi_stream.end(); + done(); + }); + bidi_stream.on('data', function() { }); + bidi_stream.write(message); + }); + }); + + describe('trigger when sending messages', function() { + var originalValue = 'foo'; + var expectedValue = 'bar'; + var options; + var metadata = new Metadata(); + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()).withOnReceiveMessage( + function (message, next) { + if (!message) { + next(message); + return; + } + assert.strictEqual(message.value, originalValue); + message.value = expectedValue; + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function (done) { + var message = { value: originalValue }; + client.echo(message, metadata, options, function (err, response) { + assert.strictEqual(response.value, expectedValue); + done(); + }); + }); + it('with client streaming call', function (done) { + var message = { value: originalValue }; + var client_stream = client.echoClientStream(metadata, options, + function (err, response) { + assert.strictEqual(response.value, expectedValue); + done(); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with server streaming call', function (done) { + var message = { value: originalValue }; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('data', function (data) { + assert.strictEqual(data.value, expectedValue); + done(); + }); + }); + it('with bidi streaming call', function (done) { + var message = { value: originalValue }; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function (data) { + assert.strictEqual(data.value, expectedValue); + done(); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when receiving status', function() { + var expectedStatus = 'foo'; + var options; + var metadata = new Metadata(); + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()).withOnReceiveStatus( + function (status, next) { + assert.strictEqual(status.code, 2); + assert.strictEqual(status.details, 'test status message'); + var new_status = { + code: 1, + details: expectedStatus, + metadata: {} + }; + next(new_status); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + it('with unary call', function (done) { + var message = { value: 'error' }; + var unary_call = client.echo(message, metadata, options, function () { + }); + unary_call.on('status', function (status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + }); + it('with client streaming call', function (done) { + var message = { value: 'error' }; + var client_stream = client.echoClientStream(metadata, options, + function () { + }); + client_stream.on('status', function (status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + client_stream.write(message); + client_stream.end(); + }); + + it('with server streaming call', function(done) { + var message = {value: 'error'}; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('error', function (err) { + }); + server_stream.on('data', function (data) { + }); + server_stream.on('status', function (status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + }); + + it('with bidi streaming call', function(done) { + var message = {value: 'error'}; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('error', function(err) {}); + bidi_stream.on('data', function(data) {}); + bidi_stream.on('status', function(status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + describe('delay streaming headers', function() { + var options; + var metadata = new Metadata(); + before(function() { + var foo_interceptor = function (options, nextCall) { + var startNext; + var startListener; + var startMetadata; + var methods = { + start: function (metadata, listener, next) { + startNext = next; + startListener = listener; + startMetadata = metadata; + }, + sendMessage: function (message, next) { + startMetadata.set('fromMessage', message.value); + startNext(startMetadata, startListener); + next(message); + } + }; + return new InterceptingCall(nextCall(options), methods); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with client streaming call', function (done) { + var message = { value: 'foo' }; + var client_stream = client.echoClientStream(metadata, options, + function () { }); + client_stream.on('metadata', function (metadata) { + assert.equal(metadata.get('fromMessage'), 'foo'); + done(); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with bidi streaming call', function (done) { + var message = { value: 'foo' }; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('metadata', function (metadata) { + assert.equal(metadata.get('fromMessage'), 'foo'); + done(); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe.only('order of operations enforced for async interceptors', function() { + it('with unary call', function(done) { + var expected_calls = [ + 'close_b', + 'message_b', + 'start_b', + 'done' + ]; + var registry = new CallRegistry(done, expected_calls, true, true); + var message = {value: 'foo'}; + var interceptor_a = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + setTimeout(function() { next(metadata, listener); }, 50); + }, + sendMessage: function(message, next) { + setTimeout(function () { next(message); }, 10); + } + }); + }; + var interceptor_b = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + registry.addCall('start_b'); + next(metadata, listener); + }, + sendMessage: function(message, next) { + registry.addCall('message_b'); + next(message); + }, + halfClose: function(next) { + registry.addCall('close_b'); + next(); + } + }); + }; + var options = { + interceptors: [interceptor_a, interceptor_b] + }; + client.echo(message, options, function(err, response) { + assert.strictEqual(err, null); + registry.addCall('done'); + }); + }); + it('with serverStreaming call', function(done) { + var expected_calls = [ + 'close_b', + 'message_b', + 'start_b', + 'done' + ]; + var registry = new CallRegistry(done, expected_calls, true, true); + var message = {value: 'foo'}; + var interceptor_a = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + setTimeout(function() { next(metadata, listener); }, 50); + }, + sendMessage: function(message, next) { + setTimeout(function () { next(message); }, 10); + } + }); + }; + var interceptor_b = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + registry.addCall('start_b'); + next(metadata, listener); + }, + sendMessage: function(message, next) { + registry.addCall('message_b'); + next(message); + }, + halfClose: function(next) { + registry.addCall('close_b'); + next(); + } + }); + }; + var options = { + interceptors: [interceptor_a, interceptor_b] + }; + var stream = client.echoServerStream(message, options); + stream.on('data', function(response) { + assert.strictEqual(response.value, 'foo'); + registry.addCall('done'); + }); + }); + }); +});