Add client interceptors

This commit is contained in:
murgatroid99 2019-10-25 10:26:58 -07:00
parent 3144cb6ada
commit acbf17d9b7
8 changed files with 2365 additions and 106 deletions

View File

@ -86,20 +86,40 @@ export interface InterceptingListener {
onReceiveStatus(status: StatusObject): void; onReceiveStatus(status: StatusObject): void;
} }
class InterceptingListenerImpl implements InterceptingListener { export function isInterceptingListener(listener: Listener | InterceptingListener): listener is InterceptingListener {
return listener.onReceiveMetadata !== undefined && listener.onReceiveMetadata.length === 1;
}
export class InterceptingListenerImpl implements InterceptingListener {
private processingMessage = false;
private pendingStatus: StatusObject | null = null;
constructor(private listener: FullListener, private nextListener: InterceptingListener) {} constructor(private listener: FullListener, private nextListener: InterceptingListener) {}
onReceiveMetadata(metadata: Metadata): void { onReceiveMetadata(metadata: Metadata): void {
const next = this.nextListener.onReceiveMetadata.bind(this.nextListener); this.listener.onReceiveMetadata(metadata, (metadata) => {
this.listener.onReceiveMetadata(metadata, next); this.nextListener.onReceiveMetadata(metadata);
});
} }
onReceiveMessage(message: any): void { onReceiveMessage(message: any): void {
const next = this.nextListener.onReceiveMessage.bind(this.nextListener); /* If this listener processes messages asynchronously, the last message may
this.listener.onReceiveMessage(message, next); * be reordered with respect to the status */
this.processingMessage = true;
this.listener.onReceiveMessage(message, (msg) => {
this.processingMessage = false;
this.nextListener.onReceiveMessage(msg);
if (this.pendingStatus) {
this.nextListener.onReceiveStatus(this.pendingStatus);
}
});
} }
onReceiveStatus(status: StatusObject): void { onReceiveStatus(status: StatusObject): void {
const next = this.nextListener.onReceiveStatus.bind(this.nextListener); this.listener.onReceiveStatus(status, (processedStatus) => {
this.listener.onReceiveStatus(status, next); if (this.processingMessage) {
this.pendingStatus = processedStatus;
} else {
this.nextListener.onReceiveStatus(processedStatus);
}
});
} }
} }
@ -107,11 +127,16 @@ export interface WriteCallback {
(error?: Error | null): void; (error?: Error | null): void;
} }
export type Call = { export interface MessageContext {
callback?: WriteCallback;
flags?: number;
}
export interface Call {
cancelWithStatus(status: Status, details: string): void; cancelWithStatus(status: Status, details: string): void;
getPeer(): string; getPeer(): string;
start(metadata: Metadata, listener: InterceptingListener): void; start(metadata: Metadata, listener: InterceptingListener): void;
write(writeObj: WriteObject, callback: WriteCallback): void; sendMessageWithContext(context: MessageContext, message: any): void;
startRead(): void; startRead(): void;
halfClose(): void; halfClose(): void;
@ -214,6 +239,13 @@ export class Http2CallStream implements Call {
} }
} else { } else {
this.listener!.onReceiveMessage(message); this.listener!.onReceiveMessage(message);
/* Don't wait for the upper layer to ask for a read before pushing null
* to close out the call, because pushing null doesn't actually push
* another message up to the upper layer */
if (this.unpushedReadMessages.length > 0 && this.unpushedReadMessages[0] === null) {
this.unpushedReadMessages.shift();
this.push(null);
}
} }
} }
@ -495,7 +527,12 @@ export class Http2CallStream implements Call {
} }
} }
write(writeObj: WriteObject, cb: WriteCallback) { sendMessageWithContext(context: MessageContext, message: Buffer) {
const writeObj: WriteObject = {
message: message,
flags: context.flags
};
const cb: WriteCallback = context.callback || (() => {});
this.filterStack.sendMessage(Promise.resolve(writeObj)).then(message => { this.filterStack.sendMessage(Promise.resolve(writeObj)).then(message => {
if (this.http2Stream === null) { if (this.http2Stream === null) {
this.pendingWrite = message.message; this.pendingWrite = message.message;

View File

@ -160,7 +160,12 @@ export class ClientWritableStreamImpl<RequestType> extends Writable
} }
_write(chunk: RequestType, encoding: string, cb: WriteCallback) { _write(chunk: RequestType, encoding: string, cb: WriteCallback) {
tryWrite<RequestType>(this.call, this.serialize, chunk, encoding, cb); const writeObj: WriteObject = { message: chunk };
const flags: number = Number(encoding);
if (!Number.isNaN(flags)) {
writeObj.flags = flags;
}
this.call.write(writeObj, cb);
} }
_final(cb: Function) { _final(cb: Function) {
@ -192,7 +197,12 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
} }
_write(chunk: RequestType, encoding: string, cb: WriteCallback) { _write(chunk: RequestType, encoding: string, cb: WriteCallback) {
tryWrite<RequestType>(this.call, this.serialize, chunk, encoding, cb); const writeObj: WriteObject = { message: chunk };
const flags: number = Number(encoding);
if (!Number.isNaN(flags)) {
writeObj.flags = flags;
}
this.call.write(writeObj, cb);
} }
_final(cb: Function) { _final(cb: Function) {

View File

@ -0,0 +1,423 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { Metadata } from './metadata';
import { StatusObject, CallStreamOptions, Listener, MetadataListener, MessageListener, StatusListener, FullListener, InterceptingListener, WriteObject, WriteCallback, InterceptingListenerImpl, isInterceptingListener, MessageContext, Http2CallStream, Deadline, Call } from './call-stream';
import { Status } from './constants';
import { Channel } from './channel';
import { CallOptions } from './client';
import { CallCredentials } from './call-credentials';
import { ClientMethodDefinition, Serialize } from './make-client';
export class InterceptorConfigurationError extends Error {
constructor(message: string) {
super(message);
this.name = 'InterceptorConfigurationError';
Error.captureStackTrace(this, InterceptorConfigurationError);
}
}
export interface MetadataRequester {
(metadata: Metadata, listener: InterceptingListener, next: (metadata: Metadata, listener: InterceptingListener | Listener) => void): void;
}
export interface MessageRequester {
(message: any, next: (message: any) => void): void;
}
export interface CloseRequester {
(next: () => void): void;
}
export interface CancelRequester {
(next: () => void): void;
}
export interface FullRequester {
start: MetadataRequester;
sendMessage: MessageRequester;
halfClose: CloseRequester;
cancel: CancelRequester;
}
export type Requester = Partial<FullRequester>;
export class ListenerBuilder {
private metadata: MetadataListener | undefined = undefined;
private message: MessageListener | undefined = undefined;
private status: StatusListener | undefined = undefined;
withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this {
this.metadata = onReceiveMetadata;
return this;
}
withOnReceiveMessage(onReceiveMessage: MessageListener): this {
this.message = onReceiveMessage;
return this;
}
withOnReceiveStatus(onReceiveStatus: StatusListener): this {
this.status = onReceiveStatus;
return this;
}
build(): Listener {
return {
onReceiveMetadata: this.metadata,
onReceiveMessage: this.message,
onReceiveStatus: this.status
}
}
}
export class RequesterBuilder {
private start: MetadataRequester | undefined = undefined;
private message: MessageRequester | undefined = undefined;
private halfClose: CloseRequester | undefined = undefined;
private cancel: CancelRequester | undefined = undefined;
withStart(start: MetadataRequester): this {
this.start = start;
return this;
}
withSendMessage(sendMessage: MessageRequester): this {
this.message = sendMessage;
return this;
}
withHalfClose(halfClose: CloseRequester): this {
this.halfClose = halfClose;
return this;
}
withCancel(cancel: CancelRequester): this {
this.cancel = cancel;
return this;
}
build(): Requester {
return {
start: this.start,
sendMessage: this.message,
halfClose: this.halfClose,
cancel: this.cancel
};
}
}
const defaultListener: FullListener = {
onReceiveMetadata: (metadata, next) => {
next(metadata);
},
onReceiveMessage: (message, next) => {
next(message);
},
onReceiveStatus: (status, next) => {
next(status);
}
};
const defaultRequester: FullRequester = {
start: (metadata, listener, next) => {
next(metadata, listener);
},
sendMessage: (message, next) => {
next(message);
},
halfClose: (next) => {
next();
},
cancel: (next) => {
next();
}
}
export interface InterceptorOptions extends CallOptions {
method_definition: ClientMethodDefinition<any, any>;
}
export interface InterceptingCallInterface {
cancelWithStatus(status: Status, details: string): void;
getPeer(): string;
start(metadata: Metadata, listener: InterceptingListener): void;
sendMessageWithContext(context: MessageContext, message: any): void;
startRead(): void;
halfClose(): void;
getDeadline(): Deadline;
getCredentials(): CallCredentials;
setCredentials(credentials: CallCredentials): void;
getMethod(): string;
getHost(): string;
}
export class InterceptingCall implements InterceptingCallInterface {
private requester: FullRequester;
private processingMessage = false;
private pendingHalfClose = false;
constructor(private nextCall: InterceptingCallInterface, requester?: Requester) {
if (requester) {
// Undefined elements overwrite, unset ones do not
this.requester = {
start: requester.start || defaultRequester.start,
sendMessage: requester.sendMessage || defaultRequester.sendMessage,
halfClose: requester.halfClose || defaultRequester.halfClose,
cancel: requester.cancel || defaultRequester.cancel
}
} else {
this.requester = defaultRequester;
}
}
cancelWithStatus(status: Status, details: string) {
this.requester.cancel(() => {
this.nextCall.cancelWithStatus(status, details);
});
}
getPeer() {
return this.nextCall.getPeer();
}
start(metadata: Metadata, interceptingListener: InterceptingListener): void {
this.requester.start(metadata, interceptingListener, (md, listener) => {
let finalInterceptingListener: InterceptingListener;
if (isInterceptingListener(listener)) {
finalInterceptingListener = listener;
} else {
const fullListener: FullListener = {
onReceiveMetadata: listener.onReceiveMetadata || defaultListener.onReceiveMetadata,
onReceiveMessage: listener.onReceiveMessage || defaultListener.onReceiveMessage,
onReceiveStatus: listener.onReceiveStatus || defaultListener.onReceiveStatus
};
finalInterceptingListener = new InterceptingListenerImpl(fullListener, interceptingListener);
}
this.nextCall.start(md, finalInterceptingListener);
});
}
sendMessageWithContext(context: MessageContext, message: any): void {
this.processingMessage = true;
this.requester.sendMessage(message, (finalMessage) => {
this.processingMessage = false;
this.nextCall.sendMessageWithContext(context, finalMessage);
if (this.pendingHalfClose) {
this.nextCall.halfClose();
}
})
}
sendMessage(message: any): void {
this.sendMessageWithContext({}, message);
}
startRead(): void {
this.nextCall.startRead();
}
halfClose(): void {
this.requester.halfClose(() => {
if (this.processingMessage) {
this.pendingHalfClose = true;
} else {
this.nextCall.halfClose();
}
});
}
getDeadline(): number | Date {
return this.nextCall.getDeadline();
}
getCredentials(): CallCredentials {
return this.nextCall.getCredentials();
}
setCredentials(credentials: CallCredentials): void {
this.nextCall.setCredentials(credentials);
}
getMethod(): string {
return this.nextCall.getHost();
}
getHost(): string {
return this.nextCall.getHost();
}
}
function getCall(channel: Channel, path: string, options: CallOptions): Call {
var deadline;
var host;
var parent;
var propagate_flags;
var credentials;
if (options) {
deadline = options.deadline;
host = options.host;
propagate_flags = options.propagate_flags;
credentials = options.credentials;
}
if (deadline === undefined) {
deadline = Infinity;
}
var call = channel.createCall(path, deadline, host,
parent, propagate_flags);
if (credentials) {
call.setCredentials(credentials);
}
return call;
}
class BaseInterceptingCall implements InterceptingCallInterface {
constructor(protected call: Call, protected methodDefinition: ClientMethodDefinition<any, any>) {}
cancelWithStatus(status: Status, details: string): void {
this.call.cancelWithStatus(status, details);
}
getPeer(): string {
return this.call.getPeer();
}
getDeadline(): number | Date {
return this.call.getDeadline();
}
getCredentials(): CallCredentials {
return this.call.getCredentials();
}
setCredentials(credentials: CallCredentials): void {
this.call.setCredentials(credentials);
}
getMethod(): string {
return this.call.getMethod();
}
getHost(): string {
return this.call.getHost();
}
sendMessageWithContext(context: MessageContext, message: any): void {
let serialized: Buffer;
try {
serialized = this.methodDefinition.requestSerialize(message);
this.call.sendMessageWithContext(context, serialized);
} catch (e) {
this.call.cancelWithStatus(Status.INTERNAL, 'Serialization failure');
}
}
start(metadata: Metadata, listener: InterceptingListener): void {
let readError: StatusObject | null = null;
this.call.start(metadata, {
onReceiveMetadata: (metadata) => {
listener.onReceiveMetadata(metadata);
},
onReceiveMessage: (message) => {
let deserialized: any;
try {
deserialized = this.methodDefinition.responseDeserialize(message);
listener.onReceiveMessage(deserialized);
} catch (e) {
readError = {code: Status.INTERNAL, details: 'Failed to parse server response', metadata: new Metadata()};
this.call.cancelWithStatus(readError.code, readError.details);
}
},
onReceiveStatus: (status) => {
if (readError) {
listener.onReceiveStatus(readError);
} else {
listener.onReceiveStatus(status);
}
}
});
}
startRead() {
this.call.startRead();
}
halfClose(): void {
this.call.halfClose();
}
}
class BaseUnaryInterceptingCall extends BaseInterceptingCall implements InterceptingCallInterface {
constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
super(call, methodDefinition);
}
start(metadata: Metadata, listener: InterceptingListener): void {
super.start(metadata, listener);
this.call.startRead();
}
}
class BaseStreamingInterceptingCall extends BaseInterceptingCall implements InterceptingCallInterface { }
function getBottomInterceptingCall(channel: Channel, path: string, options: InterceptorOptions, methodDefinition: ClientMethodDefinition<any, any>) {
const call = getCall(channel, path, options);
if (methodDefinition.responseStream) {
return new BaseStreamingInterceptingCall(call, methodDefinition);
} else {
return new BaseUnaryInterceptingCall(call, methodDefinition);
}
}
export interface NextCall {
(options: InterceptorOptions): InterceptingCallInterface;
}
export interface Interceptor {
(options: InterceptorOptions, nextCall: NextCall): InterceptingCall
}
export interface InterceptorProvider {
(methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
}
export interface InterceptorArguments {
clientInterceptors: Interceptor[],
clientInterceptorProviders: InterceptorProvider[],
callInterceptors: Interceptor[],
callInterceptorProviders: InterceptorProvider[]
}
export function getInterceptingCall(interceptorArgs: InterceptorArguments, methodDefinition: ClientMethodDefinition<any, any>, options: CallOptions, channel: Channel): InterceptingCallInterface {
if (interceptorArgs.clientInterceptors.length > 0 && interceptorArgs.clientInterceptorProviders.length > 0) {
throw new InterceptorConfigurationError(
'Both interceptors and interceptor_providers were passed as options ' +
'to the client constructor. Only one of these is allowed.'
);
}
if (interceptorArgs.callInterceptors.length > 0 && interceptorArgs.callInterceptorProviders.length > 0) {
throw new InterceptorConfigurationError(
'Both interceptors and interceptor_providers were passed as call ' +
'options. Only one of these is allowed.'
);
}
let interceptors: Interceptor[] = [];
// Interceptors passed to the call override interceptors passed to the client constructor
if (interceptorArgs.callInterceptors.length > 0 || interceptorArgs.callInterceptorProviders.length > 0) {
interceptors = ([] as Interceptor[]).concat(
interceptorArgs.callInterceptors,
interceptorArgs.callInterceptorProviders.map(provider => provider(methodDefinition))
).filter(interceptor => interceptor);
// Filter out falsy values when providers return nothing
} else {
interceptors = ([] as Interceptor[]).concat(
interceptorArgs.clientInterceptors,
interceptorArgs.clientInterceptorProviders.map(provider => provider(methodDefinition))
).filter(interceptor => interceptor);
// Filter out falsy values when providers return nothing
}
const interceptorOptions = Object.assign({}, options, {method_definition: methodDefinition});
/* For each interceptor in the list, the nextCall function passed to it is
* based on the next interceptor in the list, using a nextCall function
* constructed with the following interceptor in the list, and so on. The
* initialValue, which is effectively at the end of the list, is a nextCall
* function that invokes getBottomInterceptingCall, which handles
* (de)serialization and also gets the underlying call from the channel */
const getCall: NextCall = interceptors.reduceRight<NextCall>((previousValue: NextCall, currentValue: Interceptor) => {
return currentOptions => currentValue(currentOptions, previousValue);
}, (finalOptions: InterceptorOptions) => getBottomInterceptingCall(channel, methodDefinition.path, finalOptions, methodDefinition));
return getCall(interceptorOptions);
}

View File

@ -35,8 +35,12 @@ import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options'; import { ChannelOptions } from './channel-options';
import { Status } from './constants'; import { Status } from './constants';
import { Metadata } from './metadata'; import { Metadata } from './metadata';
import { ClientMethodDefinition } from './make-client';
import { getInterceptingCall, Interceptor, InterceptorProvider, InterceptorArguments } from './client-interceptors';
const CHANNEL_SYMBOL = Symbol(); const CHANNEL_SYMBOL = Symbol();
const INTERCEPTOR_SYMBOL = Symbol();
const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
export interface UnaryCallback<ResponseType> { export interface UnaryCallback<ResponseType> {
(err: ServiceError | null, value?: ResponseType): void; (err: ServiceError | null, value?: ResponseType): void;
@ -49,6 +53,8 @@ export interface CallOptions {
* but the server is not yet implemented so it makes no sense to have it */ * but the server is not yet implemented so it makes no sense to have it */
propagate_flags?: number; propagate_flags?: number;
credentials?: CallCredentials; credentials?: CallCredentials;
interceptors?: Interceptor[],
interceptor_providers?: InterceptorProvider[]
} }
export type ClientOptions = Partial<ChannelOptions> & { export type ClientOptions = Partial<ChannelOptions> & {
@ -58,6 +64,8 @@ export type ClientOptions = Partial<ChannelOptions> & {
credentials: ChannelCredentials, credentials: ChannelCredentials,
options: ClientOptions options: ClientOptions
) => Channel; ) => Channel;
interceptors?: Interceptor[],
interceptor_providers?: InterceptorProvider[]
}; };
/** /**
@ -66,6 +74,8 @@ export type ClientOptions = Partial<ChannelOptions> & {
*/ */
export class Client { export class Client {
private readonly [CHANNEL_SYMBOL]: Channel; private readonly [CHANNEL_SYMBOL]: Channel;
private readonly [INTERCEPTOR_SYMBOL]: Interceptor[];
private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[];
constructor( constructor(
address: string, address: string,
credentials: ChannelCredentials, credentials: ChannelCredentials,
@ -86,6 +96,13 @@ export class Client {
options options
); );
} }
this[INTERCEPTOR_SYMBOL] = options.interceptors || [];
this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers || [];
if (this[INTERCEPTOR_SYMBOL].length > 0 && this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0) {
throw new Error(
'Both interceptors and interceptor_providers were passed as options ' +
'to the client constructor. Only one of these is allowed.');
}
} }
close(): void { close(): void {
@ -201,36 +218,35 @@ export class Client {
({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments< ({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments<
ResponseType ResponseType
>(metadata, options, callback)); >(metadata, options, callback));
const call: Call = this[CHANNEL_SYMBOL].createCall( const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> = {
method, path: method,
options.deadline, requestStream: false,
options.host, responseStream: false,
null, requestSerialize: serialize,
options.propagate_flags responseDeserialize: deserialize
); };
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors || [],
callInterceptorProviders: options.interceptor_providers || []
};
const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]);
if (options.credentials) { if (options.credentials) {
call.setCredentials(options.credentials); call.setCredentials(options.credentials);
} }
const message: Buffer = serialize(argument); const writeObj: WriteObject = { message: argument };
const writeObj: WriteObject = { message };
const emitter = new ClientUnaryCallImpl(call); const emitter = new ClientUnaryCallImpl(call);
let responseMessage: ResponseType | null = null; let responseMessage: ResponseType | null = null;
call.start(metadata, { call.start(metadata, {
onReceiveMetadata: (metadata) => { onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata); emitter.emit('metadata', metadata);
}, },
onReceiveMessage(message: Buffer) { onReceiveMessage(message: any) {
if (responseMessage != null) { if (responseMessage != null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
} }
try { responseMessage = message;
responseMessage = deserialize(message);
} catch (e) {
call.cancelWithStatus(
Status.INTERNAL,
'Failed to parse server response'
);
}
call.startRead(); call.startRead();
}, },
onReceiveStatus(status: StatusObject) { onReceiveStatus(status: StatusObject) {
@ -286,13 +302,20 @@ export class Client {
({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments< ({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments<
ResponseType ResponseType
>(metadata, options, callback)); >(metadata, options, callback));
const call: Call = this[CHANNEL_SYMBOL].createCall( const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> = {
method, path: method,
options.deadline, requestStream: true,
options.host, responseStream: false,
null, requestSerialize: serialize,
options.propagate_flags responseDeserialize: deserialize
); };
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors || [],
callInterceptorProviders: options.interceptor_providers || []
};
const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]);
if (options.credentials) { if (options.credentials) {
call.setCredentials(options.credentials); call.setCredentials(options.credentials);
} }
@ -302,18 +325,11 @@ export class Client {
onReceiveMetadata: (metadata) => { onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata); emitter.emit('metadata', metadata);
}, },
onReceiveMessage(message: Buffer) { onReceiveMessage(message: any) {
if (responseMessage != null) { if (responseMessage != null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
} }
try { responseMessage = message;
responseMessage = deserialize(message);
} catch (e) {
call.cancelWithStatus(
Status.INTERNAL,
'Failed to parse server response'
);
}
call.startRead(); call.startRead();
}, },
onReceiveStatus(status: StatusObject) { onReceiveStatus(status: StatusObject) {
@ -377,32 +393,31 @@ export class Client {
options?: CallOptions options?: CallOptions
): ClientReadableStream<ResponseType> { ): ClientReadableStream<ResponseType> {
({ metadata, options } = this.checkMetadataAndOptions(metadata, options)); ({ metadata, options } = this.checkMetadataAndOptions(metadata, options));
const call: Call = this[CHANNEL_SYMBOL].createCall( const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> = {
method, path: method,
options.deadline, requestStream: false,
options.host, responseStream: true,
null, requestSerialize: serialize,
options.propagate_flags responseDeserialize: deserialize
); };
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors || [],
callInterceptorProviders: options.interceptor_providers || []
};
const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]);
if (options.credentials) { if (options.credentials) {
call.setCredentials(options.credentials); call.setCredentials(options.credentials);
} }
const message: Buffer = serialize(argument); const writeObj: WriteObject = { message: argument };
const writeObj: WriteObject = { message };
const stream = new ClientReadableStreamImpl<ResponseType>(call, deserialize); const stream = new ClientReadableStreamImpl<ResponseType>(call, deserialize);
call.start(metadata, { call.start(metadata, {
onReceiveMetadata(metadata: Metadata) { onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata); stream.emit('metadata', metadata);
}, },
onReceiveMessage(message: Buffer) { onReceiveMessage(message: any) {
let deserialized: ResponseType; if (stream.push(message)) {
try {
deserialized = deserialize(message);
} catch (e) {
call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response');
return;
}
if (stream.push(deserialized)) {
call.startRead(); call.startRead();
} }
}, },
@ -439,13 +454,20 @@ export class Client {
options?: CallOptions options?: CallOptions
): ClientDuplexStream<RequestType, ResponseType> { ): ClientDuplexStream<RequestType, ResponseType> {
({ metadata, options } = this.checkMetadataAndOptions(metadata, options)); ({ metadata, options } = this.checkMetadataAndOptions(metadata, options));
const call: Call = this[CHANNEL_SYMBOL].createCall( const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> = {
method, path: method,
options.deadline, requestStream: true,
options.host, responseStream: true,
null, requestSerialize: serialize,
options.propagate_flags responseDeserialize: deserialize
); };
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors || [],
callInterceptorProviders: options.interceptor_providers || []
};
const call: Call = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]);
if (options.credentials) { if (options.credentials) {
call.setCredentials(options.credentials); call.setCredentials(options.credentials);
} }
@ -459,14 +481,7 @@ export class Client {
stream.emit('metadata', metadata); stream.emit('metadata', metadata);
}, },
onReceiveMessage(message: Buffer) { onReceiveMessage(message: Buffer) {
let deserialized: ResponseType; if (stream.push(message)) {
try {
deserialized = deserialize(message);
} catch (e) {
call.cancelWithStatus(Status.INTERNAL, 'Failed to parse server response');
return;
}
if (stream.push(deserialized)) {
call.startRead(); call.startRead();
} }
}, },

View File

@ -252,19 +252,6 @@ export type Call =
| ClientDuplexStream<any, any>; | ClientDuplexStream<any, any>;
/* tslint:enable:no-any */ /* tslint:enable:no-any */
export type MetadataListener = (metadata: Metadata, next: Function) => void;
// tslint:disable-next-line:no-any
export type MessageListener = (message: any, next: Function) => void;
export type StatusListener = (status: StatusObject, next: Function) => void;
export interface Listener {
onReceiveMetadata?: MetadataListener;
onReceiveMessage?: MessageListener;
onReceiveStatus?: StatusListener;
}
/**** Unimplemented function stubs ****/ /**** Unimplemented function stubs ****/
/* tslint:disable:no-any variable-name */ /* tslint:disable:no-any variable-name */
@ -299,17 +286,16 @@ export const getClientChannel = (client: Client) => {
export { StatusBuilder }; export { StatusBuilder };
export const ListenerBuilder = () => { export { Listener } from './call-stream';
throw new Error('Not yet implemented');
};
export const InterceptorBuilder = () => { export {
throw new Error('Not yet implemented'); Requester,
}; ListenerBuilder,
RequesterBuilder,
export const InterceptingCall = () => { Interceptor,
throw new Error('Not yet implemented'); InterceptorProvider,
}; InterceptingCall,
InterceptorConfigurationError } from './client-interceptors';
export { GrpcObject } from './make-client'; export { GrpcObject } from './make-client';

View File

@ -27,17 +27,27 @@ export interface Deserialize<T> {
(bytes: Buffer): T; (bytes: Buffer): T;
} }
export interface MethodDefinition<RequestType, ResponseType> { export interface ClientMethodDefinition<RequestType, ResponseType> {
path: string; path: string;
requestStream: boolean; requestStream: boolean;
responseStream: boolean; responseStream: boolean;
requestSerialize: Serialize<RequestType>; requestSerialize: Serialize<RequestType>;
responseSerialize: Serialize<ResponseType>;
requestDeserialize: Deserialize<RequestType>;
responseDeserialize: Deserialize<ResponseType>; responseDeserialize: Deserialize<ResponseType>;
originalName?: string; originalName?: string;
} }
export interface ServerMethodDefinition<RequestType, ResponseType> {
path: string;
requestStream: boolean;
responseStream: boolean;
responseSerialize: Serialize<ResponseType>;
requestDeserialize: Deserialize<RequestType>;
originalName?: string;
}
export interface MethodDefinition<RequestType, ResponseType> extends ClientMethodDefinition<RequestType, ResponseType>, ServerMethodDefinition<RequestType, ResponseType> {
}
export interface ServiceDefinition { export interface ServiceDefinition {
[index: string]: MethodDefinition<any, any>; [index: string]: MethodDefinition<any, any>;
} }

@ -1 +1 @@
Subproject commit f703e1c86c1504d9e48953f8da31f842679b7775 Subproject commit e6224f8fd9e4903238cebf22b9d78d09e52c378c

File diff suppressed because it is too large Load Diff