grpc-js: Add callInvocationTransformer option

This commit is contained in:
Michael Lumish 2020-03-16 12:07:35 -07:00
parent 2ca96a322f
commit a2a408b777
2 changed files with 149 additions and 72 deletions

View File

@ -34,6 +34,7 @@ export type ServiceError = StatusObject & Error;
* A base type for all user-facing values returned by client-side method calls.
*/
export type SurfaceCall = {
call?: InterceptingCallInterface;
cancel(): void;
getPeer(): string;
} & EmitterAugmentation1<'metadata', Metadata> &
@ -82,56 +83,57 @@ export function callErrorFromStatus(status: StatusObject): ServiceError {
export class ClientUnaryCallImpl extends EventEmitter
implements ClientUnaryCall {
constructor(private readonly call: InterceptingCallInterface) {
public call?: InterceptingCallInterface;
constructor() {
super();
}
cancel(): void {
this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
}
getPeer(): string {
return this.call.getPeer();
return this.call?.getPeer() ?? '';
}
}
export class ClientReadableStreamImpl<ResponseType> extends Readable
implements ClientReadableStream<ResponseType> {
public call?: InterceptingCallInterface;
constructor(
private readonly call: InterceptingCallInterface,
readonly deserialize: (chunk: Buffer) => ResponseType
) {
super({ objectMode: true });
}
cancel(): void {
this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
}
getPeer(): string {
return this.call.getPeer();
return this.call?.getPeer() ?? '';
}
_read(_size: number): void {
this.call.startRead();
this.call?.startRead();
}
}
export class ClientWritableStreamImpl<RequestType> extends Writable
implements ClientWritableStream<RequestType> {
public call?: InterceptingCallInterface;
constructor(
private readonly call: InterceptingCallInterface,
readonly serialize: (value: RequestType) => Buffer
) {
super({ objectMode: true });
}
cancel(): void {
this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
}
getPeer(): string {
return this.call.getPeer();
return this.call?.getPeer() ?? '';
}
_write(chunk: RequestType, encoding: string, cb: WriteCallback) {
@ -142,19 +144,19 @@ export class ClientWritableStreamImpl<RequestType> extends Writable
if (!Number.isNaN(flags)) {
context.flags = flags;
}
this.call.sendMessageWithContext(context, chunk);
this.call?.sendMessageWithContext(context, chunk);
}
_final(cb: Function) {
this.call.halfClose();
this.call?.halfClose();
cb();
}
}
export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
implements ClientDuplexStream<RequestType, ResponseType> {
public call?: InterceptingCallInterface;
constructor(
private readonly call: InterceptingCallInterface,
readonly serialize: (value: RequestType) => Buffer,
readonly deserialize: (chunk: Buffer) => ResponseType
) {
@ -162,15 +164,15 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
}
cancel(): void {
this.call.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
}
getPeer(): string {
return this.call.getPeer();
return this.call?.getPeer() ?? '';
}
_read(_size: number): void {
this.call.startRead();
this.call?.startRead();
}
_write(chunk: RequestType, encoding: string, cb: WriteCallback) {
@ -181,11 +183,11 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
if (!Number.isNaN(flags)) {
context.flags = flags;
}
this.call.sendMessageWithContext(context, chunk);
this.call?.sendMessageWithContext(context, chunk);
}
_final(cb: Function) {
this.call.halfClose();
this.call?.halfClose();
cb();
}
}

View File

@ -40,7 +40,7 @@ import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import { Status } from './constants';
import { Metadata } from './metadata';
import { ClientMethodDefinition } from './make-client';
import { ClientMethodDefinition, MethodDefinition } from './make-client';
import {
getInterceptingCall,
Interceptor,
@ -52,6 +52,7 @@ import {
const CHANNEL_SYMBOL = Symbol();
const INTERCEPTOR_SYMBOL = Symbol();
const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol();
export interface UnaryCallback<ResponseType> {
(err: ServiceError | null, value?: ResponseType): void;
@ -68,6 +69,20 @@ export interface CallOptions {
interceptor_providers?: InterceptorProvider[];
}
export interface CallProperties<RequestType, ResponseType> {
argument?: RequestType;
metadata: Metadata;
call: SurfaceCall;
channel: Channel;
methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
callOptions: CallOptions;
callback?: UnaryCallback<ResponseType>
}
export interface CallInvocationTransformer {
(callProperties: CallProperties<any, any>): CallProperties<any, any>
}
export type ClientOptions = Partial<ChannelOptions> & {
channelOverride?: Channel;
channelFactoryOverride?: (
@ -77,6 +92,7 @@ export type ClientOptions = Partial<ChannelOptions> & {
) => Channel;
interceptors?: Interceptor[];
interceptor_providers?: InterceptorProvider[];
callInvocationTransformer?: CallInvocationTransformer;
};
/**
@ -87,6 +103,7 @@ export class Client {
private readonly [CHANNEL_SYMBOL]: Channel;
private readonly [INTERCEPTOR_SYMBOL]: Interceptor[];
private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[];
private readonly [CALL_INVOCATION_TRANSFORMER_SYMBOL]?: CallInvocationTransformer;
constructor(
address: string,
credentials: ChannelCredentials,
@ -118,6 +135,7 @@ export class Client {
'to the client constructor. Only one of these is allowed.'
);
}
this[CALL_INVOCATION_TRANSFORMER_SYMBOL] = options.callInvocationTransformer;
}
close(): void {
@ -230,9 +248,9 @@ export class Client {
options?: CallOptions | UnaryCallback<ResponseType>,
callback?: UnaryCallback<ResponseType>
): ClientUnaryCall {
({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments<
const checkedArguments = this.checkOptionalUnaryResponseArguments<
ResponseType
>(metadata, options, callback));
>(metadata, options, callback);
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
@ -243,25 +261,42 @@ export class Client {
requestSerialize: serialize,
responseDeserialize: deserialize,
};
let callProperties: CallProperties<RequestType, ResponseType> = {
argument: argument,
metadata: checkedArguments.metadata,
call: new ClientUnaryCallImpl(),
channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition,
callOptions: checkedArguments.options,
callback: checkedArguments.callback
};
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>;
}
const emitter: ClientUnaryCall = callProperties.call;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors ?? [],
callInterceptorProviders: options.interceptor_providers ?? [],
callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
methodDefinition,
options,
this[CHANNEL_SYMBOL]
callProperties.methodDefinition,
callProperties.callOptions,
callProperties.channel
);
if (options.credentials) {
call.setCredentials(options.credentials);
/* This needs to happen before the emitter is used. Unfortunately we can't
* enforce this with the type system. We need to construct this emitter
* before calling the CallInvocationTransformer, and we need to create the
* call after that. */
emitter.call = call;
if (callProperties.callOptions.credentials) {
call.setCredentials(callProperties.callOptions.credentials);
}
const emitter = new ClientUnaryCallImpl(call);
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
call.start(metadata, {
call.start(callProperties.metadata, {
onReceiveMetadata: metadata => {
emitter.emit('metadata', metadata);
},
@ -278,9 +313,9 @@ export class Client {
}
receivedStatus = true;
if (status.code === Status.OK) {
callback!(null, responseMessage!);
callProperties.callback!(null, responseMessage!);
} else {
callback!(callErrorFromStatus(status));
callProperties.callback!(callErrorFromStatus(status));
}
emitter.emit('status', status);
},
@ -326,9 +361,9 @@ export class Client {
options?: CallOptions | UnaryCallback<ResponseType>,
callback?: UnaryCallback<ResponseType>
): ClientWritableStream<RequestType> {
({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments<
const checkedArguments = this.checkOptionalUnaryResponseArguments<
ResponseType
>(metadata, options, callback));
>(metadata, options, callback);
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
@ -339,25 +374,41 @@ export class Client {
requestSerialize: serialize,
responseDeserialize: deserialize,
};
let callProperties: CallProperties<RequestType, ResponseType> = {
metadata: checkedArguments.metadata,
call: new ClientWritableStreamImpl<RequestType>(serialize),
channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition,
callOptions: checkedArguments.options,
callback: checkedArguments.callback
};
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>;
}
const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors ?? [],
callInterceptorProviders: options.interceptor_providers ?? [],
callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
methodDefinition,
options,
this[CHANNEL_SYMBOL]
callProperties.methodDefinition,
callProperties.callOptions,
callProperties.channel
);
if (options.credentials) {
call.setCredentials(options.credentials);
/* This needs to happen before the emitter is used. Unfortunately we can't
* enforce this with the type system. We need to construct this emitter
* before calling the CallInvocationTransformer, and we need to create the
* call after that. */
emitter.call = call;
if (callProperties.callOptions.credentials) {
call.setCredentials(callProperties.callOptions.credentials);
}
const emitter = new ClientWritableStreamImpl<RequestType>(call, serialize);
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
call.start(metadata, {
call.start(callProperties.metadata, {
onReceiveMetadata: metadata => {
emitter.emit('metadata', metadata);
},
@ -374,9 +425,9 @@ export class Client {
}
receivedStatus = true;
if (status.code === Status.OK) {
callback!(null, responseMessage!);
callProperties.callback!(null, responseMessage!);
} else {
callback!(callErrorFromStatus(status));
callProperties.callback!(callErrorFromStatus(status));
}
emitter.emit('status', status);
},
@ -431,7 +482,7 @@ export class Client {
metadata?: Metadata | CallOptions,
options?: CallOptions
): ClientReadableStream<ResponseType> {
({ metadata, options } = this.checkMetadataAndOptions(metadata, options));
const checkedArguments = this.checkMetadataAndOptions(metadata, options);
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
@ -442,27 +493,40 @@ export class Client {
requestSerialize: serialize,
responseDeserialize: deserialize,
};
let callProperties: CallProperties<RequestType, ResponseType> = {
argument: argument,
metadata: checkedArguments.metadata,
call: new ClientReadableStreamImpl<ResponseType>(deserialize),
channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition,
callOptions: checkedArguments.options
};
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>;
}
const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors ?? [],
callInterceptorProviders: options.interceptor_providers ?? [],
callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
methodDefinition,
options,
this[CHANNEL_SYMBOL]
callProperties.methodDefinition,
callProperties.callOptions,
callProperties.channel
);
if (options.credentials) {
call.setCredentials(options.credentials);
/* This needs to happen before the emitter is used. Unfortunately we can't
* enforce this with the type system. We need to construct this emitter
* before calling the CallInvocationTransformer, and we need to create the
* call after that. */
stream.call = call;
if (callProperties.callOptions.credentials) {
call.setCredentials(callProperties.callOptions.credentials);
}
const stream = new ClientReadableStreamImpl<ResponseType>(
call,
deserialize
);
let receivedStatus = false;
call.start(metadata, {
call.start(callProperties.metadata, {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
},
@ -509,7 +573,7 @@ export class Client {
metadata?: Metadata | CallOptions,
options?: CallOptions
): ClientDuplexStream<RequestType, ResponseType> {
({ metadata, options } = this.checkMetadataAndOptions(metadata, options));
const checkedArguments = this.checkMetadataAndOptions(metadata, options);
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
@ -520,28 +584,39 @@ export class Client {
requestSerialize: serialize,
responseDeserialize: deserialize,
};
let callProperties: CallProperties<RequestType, ResponseType> = {
metadata: checkedArguments.metadata,
call: new ClientDuplexStreamImpl<RequestType, ResponseType>(serialize, deserialize),
channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition,
callOptions: checkedArguments.options
};
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>;
}
const stream: ClientDuplexStream<RequestType, ResponseType> = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors ?? [],
callInterceptorProviders: options.interceptor_providers ?? [],
callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
methodDefinition,
options,
this[CHANNEL_SYMBOL]
callProperties.methodDefinition,
callProperties.callOptions,
callProperties.channel
);
if (options.credentials) {
call.setCredentials(options.credentials);
/* This needs to happen before the emitter is used. Unfortunately we can't
* enforce this with the type system. We need to construct this emitter
* before calling the CallInvocationTransformer, and we need to create the
* call after that. */
stream.call = call;
if (callProperties.callOptions.credentials) {
call.setCredentials(callProperties.callOptions.credentials);
}
const stream = new ClientDuplexStreamImpl<RequestType, ResponseType>(
call,
serialize,
deserialize
);
let receivedStatus = false;
call.start(metadata, {
call.start(callProperties.metadata, {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
},