From 89e47c84f791d48b0910b0f2d19299a9d5b28be3 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 22 Aug 2018 16:52:24 -0700 Subject: [PATCH] Pure JS: Implement public Channel API --- PACKAGE-COMPARISON.md | 4 +- README.md | 2 +- packages/grpc-js-core/README.md | 9 ++ .../src/call-credentials-filter.ts | 4 +- packages/grpc-js-core/src/call-stream.ts | 24 +++- packages/grpc-js-core/src/call.ts | 22 +-- packages/grpc-js-core/src/channel.ts | 116 ++++++++++++---- packages/grpc-js-core/src/client.ts | 125 +++++++++++------- .../grpc-js-core/src/compression-filter.ts | 4 +- packages/grpc-js-core/src/deadline-filter.ts | 34 +++-- packages/grpc-js-core/src/filter-stack.ts | 4 +- packages/grpc-js-core/src/filter.ts | 4 +- packages/grpc-js-core/src/index.ts | 10 +- packages/grpc-js-core/src/make-client.ts | 2 +- .../src/metadata-status-filter.ts | 4 +- packages/grpc-js-core/src/subchannel.ts | 4 +- .../grpc-js-core/test/test-call-stream.ts | 28 ++-- 17 files changed, 265 insertions(+), 135 deletions(-) diff --git a/PACKAGE-COMPARISON.md b/PACKAGE-COMPARISON.md index 3396eac1..2b32adaf 100644 --- a/PACKAGE-COMPARISON.md +++ b/PACKAGE-COMPARISON.md @@ -34,4 +34,6 @@ In addition, all channel arguments defined in [this header file](https://github. - `grpc.secondary_user_agent` - `grpc.default_authority` - `grpc.keepalive_time_ms` - - `grpc.keepalive_timeout_ms` \ No newline at end of file + - `grpc.keepalive_timeout_ms` + - `channelOverride` + - `channelFactoryOverride` \ No newline at end of file diff --git a/README.md b/README.md index a9164139..dd9ce972 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ## Implementations -For a comparison of the features available in these two libraries, see [this document](https://github.com/grpc/grpc-node/tree/master/PACKGE-COMPARISON.md) +For a comparison of the features available in these two libraries, see [this document](https://github.com/grpc/grpc-node/tree/master/PACKAGE-COMPARISON.md) ### C-based Client and Server diff --git a/packages/grpc-js-core/README.md b/packages/grpc-js-core/README.md index 9393ffc1..647121b8 100644 --- a/packages/grpc-js-core/README.md +++ b/packages/grpc-js-core/README.md @@ -18,5 +18,14 @@ npm install @grpc/grpc-js - TLS channel credentials - Call credentials (for auth) - Simple reconnection + - Channel API This library does not directly handle `.proto` files. To use `.proto` files with this library we recommend using the `@grpc/proto-loader` package. + +## Some Notes on API Guarantees + +The public API of this library follows semantic versioning, with some caveats: + + - Some methods are prefixed with an underscore. These methods are internal and should not be considered part of the public API. + - The class `Call` is only exposed due to limitations of TypeScript. It should not be considered part of the public API. + - In general, any API that is exposed by this library but is not exposed by the `grpc` library is likely an error and should not be considered part of the public API. diff --git a/packages/grpc-js-core/src/call-credentials-filter.ts b/packages/grpc-js-core/src/call-credentials-filter.ts index f25d70bc..66515fdd 100644 --- a/packages/grpc-js-core/src/call-credentials-filter.ts +++ b/packages/grpc-js-core/src/call-credentials-filter.ts @@ -1,7 +1,7 @@ import {promisify} from 'util'; import {CallCredentials} from './call-credentials'; -import {CallStream} from './call-stream'; +import {Call} from './call-stream'; import {Http2Channel} from './channel'; import {BaseFilter, Filter, FilterFactory} from './filter'; import {Metadata} from './metadata'; @@ -41,7 +41,7 @@ export class CallCredentialsFilterFactory implements this.credentials = channel.credentials.getCallCredentials(); } - createFilter(callStream: CallStream): CallCredentialsFilter { + createFilter(callStream: Call): CallCredentialsFilter { return new CallCredentialsFilter( this.credentials.compose(callStream.getCredentials()), callStream.getHost(), callStream.getMethod()); diff --git a/packages/grpc-js-core/src/call-stream.ts b/packages/grpc-js-core/src/call-stream.ts index 1fbfb023..87b38c45 100644 --- a/packages/grpc-js-core/src/call-stream.ts +++ b/packages/grpc-js-core/src/call-stream.ts @@ -8,6 +8,8 @@ import {Filter} from './filter'; import {FilterStackFactory} from './filter-stack'; import {Metadata} from './metadata'; import {ObjectDuplex, WriteCallback} from './object-stream'; +import { Meta } from 'orchestrator'; +import { Channel, Http2Channel } from './channel'; const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL} = http2.constants; @@ -16,12 +18,12 @@ export type Deadline = Date|number; export interface CallStreamOptions { deadline: Deadline; - credentials: CallCredentials; flags: number; host: string; + parentCall: Call | null; } -export type CallOptions = Partial; +export type PartialCallStreamOptions = Partial; export interface StatusObject { code: Status; @@ -43,11 +45,13 @@ export interface WriteObject { /** * This interface represents a duplex stream associated with a single gRPC call. */ -export type CallStream = { +export type Call = { cancelWithStatus(status: Status, details: string): void; getPeer(): string; + sendMetadata(metadata: Metadata): void; getDeadline(): Deadline; getCredentials(): CallCredentials; + setCredentials(credentials: CallCredentials): void; /* If the return value is null, the call has not ended yet. Otherwise, it has * ended with the specified status */ getStatus(): StatusObject | null; @@ -65,7 +69,8 @@ enum ReadState { const emptyBuffer = Buffer.alloc(0); -export class Http2CallStream extends Duplex implements CallStream { +export class Http2CallStream extends Duplex implements Call { + credentials: CallCredentials = CallCredentials.createEmpty(); filterStack: Filter; private statusEmitted = false; private http2Stream: http2.ClientHttp2Stream|null = null; @@ -103,6 +108,7 @@ export class Http2CallStream extends Duplex implements CallStream { constructor( private readonly methodName: string, + private readonly channel: Http2Channel, private readonly options: CallStreamOptions, filterStackFactory: FilterStackFactory) { super({objectMode: true}); @@ -377,6 +383,10 @@ export class Http2CallStream extends Duplex implements CallStream { } } + sendMetadata(metadata: Metadata): void { + this.channel._startHttp2Stream(this.options.host, this.methodName, this, metadata); + } + private destroyHttp2Stream() { // The http2 stream could already have been destroyed if cancelWithStatus // is called in response to an internal http2 error. @@ -402,7 +412,11 @@ export class Http2CallStream extends Duplex implements CallStream { } getCredentials(): CallCredentials { - return this.options.credentials; + return this.credentials; + } + + setCredentials(credentials: CallCredentials): void { + this.credentials = credentials; } getStatus(): StatusObject|null { diff --git a/packages/grpc-js-core/src/call.ts b/packages/grpc-js-core/src/call.ts index fc72d358..9c864d8c 100644 --- a/packages/grpc-js-core/src/call.ts +++ b/packages/grpc-js-core/src/call.ts @@ -2,7 +2,7 @@ import {EventEmitter} from 'events'; import * as _ from 'lodash'; import {Duplex, Readable, Writable} from 'stream'; -import {CallStream, StatusObject, WriteObject} from './call-stream'; +import {Call, StatusObject, WriteObject} from './call-stream'; import {Status} from './constants'; import {EmitterAugmentation1} from './events'; import {Metadata} from './metadata'; @@ -16,7 +16,7 @@ export type ServiceError = StatusObject&Error; /** * A base type for all user-facing values returned by client-side method calls. */ -export type Call = { +export type SurfaceCall = { cancel(): void; getPeer(): string; }&EmitterAugmentation1<'metadata', Metadata>& EmitterAugmentation1<'status', StatusObject>&EventEmitter; @@ -24,21 +24,21 @@ export type Call = { /** * A type representing the return value of a unary method call. */ -export type ClientUnaryCall = Call; +export type ClientUnaryCall = SurfaceCall; /** * A type representing the return value of a server stream method call. */ export type ClientReadableStream = { deserialize: (chunk: Buffer) => ResponseType; -}&Call&ObjectReadable; +}&SurfaceCall&ObjectReadable; /** * A type representing the return value of a client stream method call. */ export type ClientWritableStream = { serialize: (value: RequestType) => Buffer; -}&Call&ObjectWritable; +}&SurfaceCall&ObjectWritable; /** * A type representing the return value of a bidirectional stream method call. @@ -48,7 +48,7 @@ export type ClientDuplexStream = export class ClientUnaryCallImpl extends EventEmitter implements ClientUnaryCall { - constructor(private readonly call: CallStream) { + constructor(private readonly call: Call) { super(); call.on('metadata', (metadata: Metadata) => { this.emit('metadata', metadata); @@ -68,7 +68,7 @@ export class ClientUnaryCallImpl extends EventEmitter implements } function setUpReadableStream( - stream: ClientReadableStream, call: CallStream, + stream: ClientReadableStream, call: Call, deserialize: (chunk: Buffer) => ResponseType): void { call.on('data', (data: Buffer) => { let deserialized: ResponseType; @@ -101,7 +101,7 @@ function setUpReadableStream( export class ClientReadableStreamImpl extends Readable implements ClientReadableStream { constructor( - private readonly call: CallStream, + private readonly call: Call, readonly deserialize: (chunk: Buffer) => ResponseType) { super({objectMode: true}); call.on('metadata', (metadata: Metadata) => { @@ -124,7 +124,7 @@ export class ClientReadableStreamImpl extends Readable implements } function tryWrite( - call: CallStream, serialize: (value: RequestType) => Buffer, + call: Call, serialize: (value: RequestType) => Buffer, chunk: RequestType, encoding: string, cb: Function) { let message: Buffer; const flags: number = Number(encoding); @@ -145,7 +145,7 @@ function tryWrite( export class ClientWritableStreamImpl extends Writable implements ClientWritableStream { constructor( - private readonly call: CallStream, + private readonly call: Call, readonly serialize: (value: RequestType) => Buffer) { super({objectMode: true}); call.on('metadata', (metadata: Metadata) => { @@ -177,7 +177,7 @@ export class ClientWritableStreamImpl extends Writable implements export class ClientDuplexStreamImpl extends Duplex implements ClientDuplexStream { constructor( - private readonly call: CallStream, + private readonly call: Call, readonly serialize: (value: RequestType) => Buffer, readonly deserialize: (chunk: Buffer) => ResponseType) { super({objectMode: true}); diff --git a/packages/grpc-js-core/src/channel.ts b/packages/grpc-js-core/src/channel.ts index a27ef8f2..a14c5656 100644 --- a/packages/grpc-js-core/src/channel.ts +++ b/packages/grpc-js-core/src/channel.ts @@ -5,7 +5,7 @@ import * as url from 'url'; import {CallCredentials} from './call-credentials'; import {CallCredentialsFilterFactory} from './call-credentials-filter'; -import {CallOptions, CallStream, CallStreamOptions, Http2CallStream} from './call-stream'; +import {PartialCallStreamOptions, Call, CallStreamOptions, Http2CallStream, Deadline} from './call-stream'; import {ChannelCredentials} from './channel-credentials'; import {CompressionFilterFactory} from './compression-filter'; import {Status} from './constants'; @@ -55,22 +55,48 @@ function uniformRandom(min: number, max: number) { * An interface that represents a communication channel to a server specified * by a given address. */ -export interface Channel extends EventEmitter { - createStream(methodName: string, metadata: Metadata, options: CallOptions): - CallStream; - connect(): Promise; - getConnectivityState(): ConnectivityState; +export interface Channel { + /** + * Close the channel. This has the same functionality as the existing grpc.Client.prototype.close + */ close(): void; - - /* tslint:disable:no-any */ - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; - /* tslint:enable:no-any */ + /** + * Return the target that this channel connects to + */ + getTarget(): string; + /** + * Get the channel's current connectivity state. This method is here mainly + * because it is in the existing internal Channel class, and there isn't + * another good place to put it. + * @param tryToConnect If true, the channel will start connecting if it is + * idle. Otherwise, idle channels will only start connecting when a + * call starts. + */ + getConnectivityState(tryToConnect: boolean): ConnectivityState; + /** + * Watch for connectivity state changes. This is also here mainly because + * it is in the existing external Channel class. + * @param currentState The state to watch for transitions from. This should + * always be populated by calling getConnectivityState immediately + * before. + * @param deadline A deadline for waiting for a state change + * @param callback Called with no error when a state change, or with an + * error if the deadline passes without a state change. + */ + watchConnectivityState(currentState: ConnectivityState, deadline: Date|number, callback: (error?: Error) => void): void; + /** + * Create a call object. Call is an opaque type that is used by the Client + * class. This function is called by the gRPC library when starting a + * request. Implementers should return an instance of Call that is returned + * from calling createCall on an instance of the provided Channel class. + * @param method The full method string to request. + * @param deadline The call deadline + * @param host A host string override for making the request + * @param parentCall A server call to propagate some information from + * @param propagateFlags A bitwise combination of elements of grpc.propagate + * that indicates what information to propagate from parentCall. + */ + createCall(method: string, deadline: Deadline|null|undefined, host: string|null|undefined, parentCall: Call|null|undefined, propagateFlags: number|null|undefined): Call; } export class Http2Channel extends EventEmitter implements Channel { @@ -234,7 +260,7 @@ export class Http2Channel extends EventEmitter implements Channel { ].filter(e => e).join(' '); // remove falsey values first } - private startHttp2Stream( + _startHttp2Stream( authority: string, methodName: string, stream: Http2CallStream, metadata: Metadata) { const finalMetadata: Promise = @@ -255,7 +281,7 @@ export class Http2Channel extends EventEmitter implements Channel { /* In this case, we lost the connection while finalizing * metadata. That should be very unusual */ setImmediate(() => { - this.startHttp2Stream(authority, methodName, stream, metadata); + this._startHttp2Stream(authority, methodName, stream, metadata); }); } }) @@ -268,20 +294,19 @@ export class Http2Channel extends EventEmitter implements Channel { }); } - createStream(methodName: string, metadata: Metadata, options: CallOptions): - CallStream { + createCall(method: string, deadline: Deadline|null|undefined, host: string|null|undefined, parentCall: Call|null|undefined, propagateFlags: number|null|undefined): + Call { if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } const finalOptions: CallStreamOptions = { - deadline: options.deadline === undefined ? Infinity : options.deadline, - credentials: options.credentials || CallCredentials.createEmpty(), - flags: options.flags || 0, - host: options.host || this.defaultAuthority + deadline: (deadline === null || deadline == undefined) ? Infinity : deadline, + flags: propagateFlags || 0, + host: host || this.defaultAuthority, + parentCall: parentCall || null }; const stream: Http2CallStream = - new Http2CallStream(methodName, finalOptions, this.filterStackFactory); - this.startHttp2Stream(finalOptions.host, methodName, stream, metadata); + new Http2CallStream(method, this, finalOptions, this.filterStackFactory); return stream; } @@ -289,7 +314,7 @@ export class Http2Channel extends EventEmitter implements Channel { * Attempts to connect, returning a Promise that resolves when the connection * is successful, or rejects if the channel is shut down. */ - connect(): Promise { + private connect(): Promise { if (this.connectivityState === ConnectivityState.READY) { return Promise.resolve(); } else if (this.connectivityState === ConnectivityState.SHUTDOWN) { @@ -320,10 +345,45 @@ export class Http2Channel extends EventEmitter implements Channel { } } - getConnectivityState(): ConnectivityState { + getConnectivityState(tryToConnect: boolean): ConnectivityState { + if (tryToConnect) { + this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING); + } return this.connectivityState; } + watchConnectivityState(currentState: ConnectivityState, deadline: Date|number, callback: (error?: Error)=>void) { + if (this.connectivityState !== currentState) { + /* If the connectivity state is different from the provided currentState, + * we assume that a state change has successfully occurred */ + setImmediate(callback); + } else { + let deadlineMs: number = 0; + if (deadline instanceof Date) { + deadlineMs = deadline.getTime(); + } else { + deadlineMs = deadline; + } + let timeout: number = deadlineMs - Date.now(); + if (timeout < 0) { + timeout = 0; + } + let timeoutId = setTimeout(() => { + this.removeListener('connectivityStateChanged', eventCb); + callback(new Error('Channel state did not change before deadline')); + }, timeout); + let eventCb = () => { + clearTimeout(timeoutId); + callback(); + }; + this.once('connectivityStateChanged', eventCb); + } + } + + getTarget() { + return this.target.toString(); + } + close() { if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); diff --git a/packages/grpc-js-core/src/client.ts b/packages/grpc-js-core/src/client.ts index 55f63f16..7bfc19bb 100644 --- a/packages/grpc-js-core/src/client.ts +++ b/packages/grpc-js-core/src/client.ts @@ -2,12 +2,13 @@ import {once} from 'lodash'; import {URL} from 'url'; import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError} from './call'; -import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream'; -import {Channel, Http2Channel} from './channel'; +import {PartialCallStreamOptions, Call, StatusObject, WriteObject, Deadline} from './call-stream'; +import {Channel, Http2Channel, ConnectivityState} from './channel'; import {ChannelCredentials} from './channel-credentials'; import {Status} from './constants'; import {Metadata} from './metadata'; import {ChannelOptions} from './channel-options'; +import { CallCredentials } from './call-credentials'; // This symbol must be exported (for now). // See: https://github.com/Microsoft/TypeScript/issues/20080 @@ -17,6 +18,19 @@ export interface UnaryCallback { (err: ServiceError|null, value?: ResponseType): void; } +export interface CallOptions { + deadline?: Deadline, + host?: string, + parent?: Call, + propagate_flags?: number, + credentials?: CallCredentials +} + +export type ClientOptions = Partial & { + channelOverride?: Channel, + channelFactoryOverride?: (address: string, credentials: ChannelCredentials, options: ClientOptions) => Channel +}; + /** * A generic gRPC client. Primarily useful as a base class for all generated * clients. @@ -25,52 +39,53 @@ export class Client { private readonly[kChannel]: Channel; constructor( address: string, credentials: ChannelCredentials, - options: Partial = {}) { - this[kChannel] = new Http2Channel(address, credentials, options); + options: ClientOptions = {}) { + if (options.channelOverride) { + this[kChannel] = options.channelOverride; + } else if (options.channelFactoryOverride) { + this[kChannel] = options.channelFactoryOverride(address, credentials, options); + } else { + this[kChannel] = new Http2Channel(address, credentials, options); + } } close(): void { this[kChannel].close(); } - waitForReady(deadline: Date|number, callback: (error: Error|null) => void): - void { - const cb: (error: Error|null) => void = once(callback); - const callbackCalled = false; - let timer: NodeJS.Timer|null = null; - this[kChannel].connect().then( - () => { - if (timer) { - clearTimeout(timer); + getChannel(): Channel { + return this[kChannel]; + } + + waitForReady(deadline: Deadline, callback: (error?: Error) => void): + void { + const checkState = (err?: Error) => { + if (err) { + callback(new Error('Failed to connect before the deadline')); + return; + } + var new_state; + try { + new_state = this[kChannel].getConnectivityState(true); + } catch (e) { + callback(new Error('The channel has been closed')); + return; + } + if (new_state === ConnectivityState.READY) { + callback(); + } else { + try { + this[kChannel].watchConnectivityState(new_state, deadline, checkState); + } catch (e) { + callback(new Error('The channel has been closed')); } - cb(null); - }, - (err: Error) => { - // Rejection occurs if channel is shut down first. - if (timer) { - clearTimeout(timer); - } - cb(err); - }); - if (deadline !== Infinity) { - let timeout: number; - const now: number = (new Date()).getTime(); - if (deadline instanceof Date) { - timeout = deadline.getTime() - now; - } else { - timeout = deadline - now; - } - if (timeout < 0) { - timeout = 0; - } - timer = setTimeout(() => { - cb(new Error('Failed to connect before the deadline')); - }, timeout); - } + } + }; + setImmediate(checkState); } private handleUnaryResponse( - call: CallStream, deserialize: (value: Buffer) => ResponseType, + call: Call, deserialize: (value: Buffer) => ResponseType, callback: UnaryCallback): void { let responseMessage: ResponseType|null = null; call.on('data', (data: Buffer) => { @@ -157,11 +172,14 @@ export class Client { ({metadata, options, callback} = this.checkOptionalUnaryResponseArguments( metadata, options, callback)); - const call: CallStream = - this[kChannel].createStream(method, metadata, options); + const call: Call = + this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags); + if (options.credentials) { + call.setCredentials(options.credentials); + } const message: Buffer = serialize(argument); const writeObj: WriteObject = {message}; - writeObj.flags = options.flags; + call.sendMetadata(metadata); call.write(writeObj); call.end(); this.handleUnaryResponse(call, deserialize, callback); @@ -195,8 +213,12 @@ export class Client { ({metadata, options, callback} = this.checkOptionalUnaryResponseArguments( metadata, options, callback)); - const call: CallStream = - this[kChannel].createStream(method, metadata, options); + const call: Call = + this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags); + if (options.credentials) { + call.setCredentials(options.credentials); + } + call.sendMetadata(metadata); this.handleUnaryResponse(call, deserialize, callback); return new ClientWritableStreamImpl(call, serialize); } @@ -239,11 +261,14 @@ export class Client { metadata?: Metadata|CallOptions, options?: CallOptions): ClientReadableStream { ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); - const call: CallStream = - this[kChannel].createStream(method, metadata, options); + const call: Call = + this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags); + if (options.credentials) { + call.setCredentials(options.credentials); + } const message: Buffer = serialize(argument); const writeObj: WriteObject = {message}; - writeObj.flags = options.flags; + call.sendMetadata(metadata); call.write(writeObj); call.end(); return new ClientReadableStreamImpl(call, deserialize); @@ -263,8 +288,12 @@ export class Client { metadata?: Metadata|CallOptions, options?: CallOptions): ClientDuplexStream { ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); - const call: CallStream = - this[kChannel].createStream(method, metadata, options); + const call: Call = + this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags); + if (options.credentials) { + call.setCredentials(options.credentials); + } + call.sendMetadata(metadata); return new ClientDuplexStreamImpl( call, serialize, deserialize); } diff --git a/packages/grpc-js-core/src/compression-filter.ts b/packages/grpc-js-core/src/compression-filter.ts index dd607bae..d694f239 100644 --- a/packages/grpc-js-core/src/compression-filter.ts +++ b/packages/grpc-js-core/src/compression-filter.ts @@ -1,6 +1,6 @@ import * as zlib from 'zlib'; -import {CallStream, WriteFlags, WriteObject} from './call-stream'; +import {Call, WriteFlags, WriteObject} from './call-stream'; import {Channel} from './channel'; import {Status} from './constants'; import {BaseFilter, Filter, FilterFactory} from './filter'; @@ -189,7 +189,7 @@ export class CompressionFilter extends BaseFilter implements Filter { export class CompressionFilterFactory implements FilterFactory { constructor(private readonly channel: Channel) {} - createFilter(callStream: CallStream): CompressionFilter { + createFilter(callStream: Call): CompressionFilter { return new CompressionFilter(); } } diff --git a/packages/grpc-js-core/src/deadline-filter.ts b/packages/grpc-js-core/src/deadline-filter.ts index 2424039e..0e288608 100644 --- a/packages/grpc-js-core/src/deadline-filter.ts +++ b/packages/grpc-js-core/src/deadline-filter.ts @@ -1,5 +1,5 @@ -import {CallStream} from './call-stream'; -import {Channel, Http2Channel} from './channel'; +import {Call} from './call-stream'; +import {Channel, Http2Channel, ConnectivityState} from './channel'; import {Status} from './constants'; import {BaseFilter, Filter, FilterFactory} from './filter'; import {Metadata} from './metadata'; @@ -24,7 +24,7 @@ export class DeadlineFilter extends BaseFilter implements Filter { private deadline: number; constructor( private readonly channel: Http2Channel, - private readonly callStream: CallStream) { + private readonly callStream: Call) { super(); const callDeadline = callStream.getDeadline(); if (callDeadline instanceof Date) { @@ -46,22 +46,34 @@ export class DeadlineFilter extends BaseFilter implements Filter { } } - async sendMetadata(metadata: Promise) { + sendMetadata(metadata: Promise) { if (this.deadline === Infinity) { - return await metadata; + return metadata; } - await this.channel.connect(); - const timeoutString = getDeadline(this.deadline); - const finalMetadata = await metadata; - finalMetadata.set('grpc-timeout', timeoutString); - return finalMetadata; + return new Promise((resolve, reject) => { + if (this.channel.getConnectivityState(false) === ConnectivityState.READY) { + resolve(metadata); + } else { + const handleStateChange = (newState: ConnectivityState) => { + if (newState === ConnectivityState.READY) { + resolve(metadata); + this.channel.removeListener('connectivityStateChanged', handleStateChange); + } + }; + this.channel.on('connectivityStateChanged', handleStateChange); + } + }).then((finalMetadata: Metadata) => { + const timeoutString = getDeadline(this.deadline); + finalMetadata.set('grpc-timeout', timeoutString); + return finalMetadata; + }); } } export class DeadlineFilterFactory implements FilterFactory { constructor(private readonly channel: Http2Channel) {} - createFilter(callStream: CallStream): DeadlineFilter { + createFilter(callStream: Call): DeadlineFilter { return new DeadlineFilter(this.channel, callStream); } } diff --git a/packages/grpc-js-core/src/filter-stack.ts b/packages/grpc-js-core/src/filter-stack.ts index 78e84486..360a5cfc 100644 --- a/packages/grpc-js-core/src/filter-stack.ts +++ b/packages/grpc-js-core/src/filter-stack.ts @@ -1,6 +1,6 @@ import {flow, flowRight, map} from 'lodash'; -import {CallStream, StatusObject, WriteObject} from './call-stream'; +import {Call, StatusObject, WriteObject} from './call-stream'; import {Filter, FilterFactory} from './filter'; import {Metadata} from './metadata'; @@ -37,7 +37,7 @@ export class FilterStack implements Filter { export class FilterStackFactory implements FilterFactory { constructor(private readonly factories: Array>) {} - createFilter(callStream: CallStream): FilterStack { + createFilter(callStream: Call): FilterStack { return new FilterStack( map(this.factories, (factory) => factory.createFilter(callStream))); } diff --git a/packages/grpc-js-core/src/filter.ts b/packages/grpc-js-core/src/filter.ts index a2da1760..28b05df8 100644 --- a/packages/grpc-js-core/src/filter.ts +++ b/packages/grpc-js-core/src/filter.ts @@ -1,4 +1,4 @@ -import {CallStream, StatusObject, WriteObject} from './call-stream'; +import {Call, StatusObject, WriteObject} from './call-stream'; import {Metadata} from './metadata'; /** @@ -40,5 +40,5 @@ export abstract class BaseFilter { } export interface FilterFactory { - createFilter(callStream: CallStream): T; + createFilter(callStream: Call): T; } diff --git a/packages/grpc-js-core/src/index.ts b/packages/grpc-js-core/src/index.ts index 69e2b0c0..6372f22b 100644 --- a/packages/grpc-js-core/src/index.ts +++ b/packages/grpc-js-core/src/index.ts @@ -7,6 +7,7 @@ import {Client} from './client'; import {Status} from './constants'; import {loadPackageDefinition, makeClientConstructor} from './make-client'; import {Metadata} from './metadata'; +import { Channel } from './channel'; interface IndexedObject { [key: string]: any; @@ -105,7 +106,8 @@ export { Client, loadPackageDefinition, makeClientConstructor, - makeClientConstructor as makeGenericClientConstructor + makeClientConstructor as makeGenericClientConstructor, + Channel }; /** @@ -116,7 +118,7 @@ export const closeClient = (client: Client) => client.close(); export const waitForClientReady = (client: Client, deadline: Date|number, - callback: (error: Error|null) => void) => + callback: (error?: Error) => void) => client.waitForReady(deadline, callback); /**** Unimplemented function stubs ****/ @@ -155,8 +157,8 @@ export const ServerCredentials = { } }; -export const getClientChannel = (client: any) => { - throw new Error('Not available in this library'); +export const getClientChannel = (client: Client) => { + return Client.prototype.getChannel.call(client); }; export const StatusBuilder = () => { diff --git a/packages/grpc-js-core/src/make-client.ts b/packages/grpc-js-core/src/make-client.ts index b3165ad8..fff339f4 100644 --- a/packages/grpc-js-core/src/make-client.ts +++ b/packages/grpc-js-core/src/make-client.ts @@ -1,6 +1,6 @@ import * as _ from 'lodash'; -import {CallOptions} from './call-stream'; +import {PartialCallStreamOptions} from './call-stream'; import {ChannelOptions} from './channel-options'; import {ChannelCredentials} from './channel-credentials'; import {Client, UnaryCallback} from './client'; diff --git a/packages/grpc-js-core/src/metadata-status-filter.ts b/packages/grpc-js-core/src/metadata-status-filter.ts index 4bb869b7..75eabea8 100644 --- a/packages/grpc-js-core/src/metadata-status-filter.ts +++ b/packages/grpc-js-core/src/metadata-status-filter.ts @@ -1,4 +1,4 @@ -import {CallStream} from './call-stream'; +import {Call} from './call-stream'; import {StatusObject} from './call-stream'; import {Channel} from './channel'; import {Status} from './constants'; @@ -31,7 +31,7 @@ export class MetadataStatusFilter extends BaseFilter implements Filter { export class MetadataStatusFilterFactory implements FilterFactory { constructor(private readonly channel: Channel) {} - createFilter(callStream: CallStream): MetadataStatusFilter { + createFilter(callStream: Call): MetadataStatusFilter { return new MetadataStatusFilter(); } } diff --git a/packages/grpc-js-core/src/subchannel.ts b/packages/grpc-js-core/src/subchannel.ts index bbce38c9..f793aebb 100644 --- a/packages/grpc-js-core/src/subchannel.ts +++ b/packages/grpc-js-core/src/subchannel.ts @@ -3,7 +3,7 @@ import * as url from 'url'; import { EventEmitter } from "events"; import { Metadata } from "./metadata"; -import { CallStream, CallOptions, Http2CallStream } from "./call-stream"; +import { Call, PartialCallStreamOptions, Http2CallStream } from "./call-stream"; import { EmitterAugmentation1, EmitterAugmentation0 } from "./events"; import { ChannelOptions } from './channel-options'; @@ -29,7 +29,7 @@ export interface SubChannel extends EventEmitter { * @param headers The headers to start the stream with * @param callStream The stream to start */ - startCallStream(metadata: Metadata, callStream: CallStream): void; + startCallStream(metadata: Metadata, callStream: Call): void; close(): void; } diff --git a/packages/grpc-js-core/test/test-call-stream.ts b/packages/grpc-js-core/test/test-call-stream.ts index b188dbf4..902007fd 100644 --- a/packages/grpc-js-core/test/test-call-stream.ts +++ b/packages/grpc-js-core/test/test-call-stream.ts @@ -6,7 +6,7 @@ import * as stream from 'stream'; import {CallCredentials} from '../src/call-credentials'; import {Http2CallStream} from '../src/call-stream'; -import {Channel} from '../src/channel'; +import {Channel, Http2Channel} from '../src/channel'; import {CompressionFilterFactory} from '../src/compression-filter'; import {Status} from '../src/constants'; import {FilterStackFactory} from '../src/filter-stack'; @@ -81,9 +81,9 @@ class ClientHttp2StreamMock extends stream.Duplex implements describe('CallStream', () => { const callStreamArgs = { deadline: Infinity, - credentials: CallCredentials.createEmpty(), flags: 0, - host: '' + host: '', + parentCall: null }; /* A CompressionFilter is now necessary to frame and deframe messages. * Currently the channel is unused, so we can replace it with an empty object, @@ -102,7 +102,7 @@ describe('CallStream', () => { const responseMetadata = new Metadata(); responseMetadata.add('key', 'value'); const callStream = - new Http2CallStream('foo', callStreamArgs, filterStackFactory); + new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); @@ -140,7 +140,7 @@ describe('CallStream', () => { maybeSkip(it)(`for error code ${key}`, () => { return new Promise((resolve, reject) => { const callStream = - new Http2CallStream('foo', callStreamArgs, filterStackFactory); + new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); callStream.attachHttp2Stream(http2Stream); @@ -160,10 +160,12 @@ describe('CallStream', () => { it('should have functioning getters', (done) => { const callStream = - new Http2CallStream('foo', callStreamArgs, filterStackFactory); + new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); assert.strictEqual(callStream.getDeadline(), callStreamArgs.deadline); - assert.strictEqual(callStream.getCredentials(), callStreamArgs.credentials); assert.strictEqual(callStream.getStatus(), null); + const credentials = CallCredentials.createEmpty(); + callStream.setCredentials(credentials); + assert.strictEqual(callStream.getCredentials(), credentials); callStream.on('status', assert2.mustCall((status) => { assert.strictEqual(status.code, Status.CANCELLED); assert.strictEqual(status.details, ';)'); @@ -177,7 +179,7 @@ describe('CallStream', () => { describe('attachHttp2Stream', () => { it('should handle an empty message', (done) => { const callStream = - new Http2CallStream('foo', callStreamArgs, filterStackFactory); + new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock({payload: serialize(''), frameLengths: []}); callStream.once('data', assert2.mustCall((buffer) => { @@ -204,7 +206,7 @@ describe('CallStream', () => { it(`should handle a short message where ${testCase.description}`, (done) => { const callStream = - new Http2CallStream('foo', callStreamArgs, filterStackFactory); + new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock({ payload: serialize(message), // 21 bytes frameLengths: testCase.frameLengths @@ -234,7 +236,7 @@ describe('CallStream', () => { }].forEach((testCase: {description: string, frameLengths: number[]}) => { it(`should handle two messages where ${testCase.description}`, (done) => { const callStream = - new Http2CallStream('foo', callStreamArgs, filterStackFactory); + new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock({ payload: Buffer.concat( [serialize(message), serialize(message)]), // 42 bytes @@ -254,7 +256,7 @@ describe('CallStream', () => { it('should send buffered writes', (done) => { const callStream = - new Http2CallStream('foo', callStreamArgs, filterStackFactory); + new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); let streamFlushed = false; @@ -277,7 +279,7 @@ describe('CallStream', () => { it('should cause data chunks in write calls afterward to be written to the given stream', (done) => { const callStream = - new Http2CallStream('foo', callStreamArgs, filterStackFactory); + new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); http2Stream.once('write', assert2.mustCall((chunk: Buffer) => { @@ -295,7 +297,7 @@ describe('CallStream', () => { it('should handle underlying stream errors', () => { const callStream = - new Http2CallStream('foo', callStreamArgs, filterStackFactory); + new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); callStream.once('status', assert2.mustCall((status) => {