diff --git a/package.json b/package.json index e68c401b..ad72cd67 100644 --- a/package.json +++ b/package.json @@ -14,24 +14,23 @@ }, "license": "Apache-2.0", "devDependencies": { - "@types/mocha": "^2.2.41", - "@types/node": "^8.0.19", + "@types/mocha": "^2.2.42", + "@types/node": "^8.0.25", "clang-format": "^1.0.53", "del": "^3.0.0", - "google-ts-style": "latest", + "google-ts-style": "^0.2.0", "gulp": "^3.9.1", "gulp-help": "^1.6.1", "gulp-mocha": "^4.3.1", - "gulp-sourcemaps": "^2.6.0", + "gulp-sourcemaps": "^2.6.1", "gulp-tslint": "^8.1.1", - "gulp-typescript": "^3.2.1", + "gulp-typescript": "^3.2.2", "gulp-util": "^3.0.8", - "h2-types": "git+https://github.com/kjin/node-h2-types.git", "merge2": "^1.1.0", "mocha": "^3.5.0", "through2": "^2.0.3", "tslint": "^5.5.0", - "typescript": "^2.4.1" + "typescript": "^2.5.1" }, "contributors": [ { @@ -48,9 +47,7 @@ "test": "gulp test" }, "dependencies": { - "@types/async": "^2.0.41", "@types/lodash": "^4.14.73", - "async": "^2.5.0", "lodash": "^4.17.4" } } diff --git a/src/call-credentials-filter.ts b/src/call-credentials-filter.ts index 7b10072d..a508e846 100644 --- a/src/call-credentials-filter.ts +++ b/src/call-credentials-filter.ts @@ -1,28 +1,32 @@ import {promisify} from 'util' -import {Filter} from './filter' +import {Filter, BaseFilter, FilterFactory} from './filter' import {CallCredentials} from './call-credentials' +import {Http2Channel} from './channel' +import {CallStream} from './call-stream' +import {Metadata} from './metadata' export class CallCredentialsFilter extends BaseFilter implements Filter { - private credsMetadata: Promise; - - constructor(credentials: CallCredentials) { - // TODO(murgatroid99): pass real options to generateMetadata - credsMetadata = util.promisify(credentials.generateMetadata.bind(credentials))({}); + constructor(private readonly credentials: CallCredentials) { + super(); } - async sendMetadata(metadata: Promise) { - return (await metadata).merge(await this.credsMetadata); + async sendMetadata(metadata: Promise): Promise { + // TODO(murgatroid99): pass real options to generateMetadata + let credsMetadata = this.credentials.generateMetadata.bind({}); + let resultMetadata = await metadata; + resultMetadata.merge(await credsMetadata); + return resultMetadata; } } export class CallCredentialsFilterFactory implements FilterFactory { - private credentials: CallCredentials | null; + private readonly credentials: CallCredentials; constructor(channel: Http2Channel) { this.credentials = channel.credentials.getCallCredentials(); } createFilter(callStream: CallStream): CallCredentialsFilter { - return new CallCredentialsFilter(this.credentials.compose(callStream.credentials)); + return new CallCredentialsFilter(this.credentials.compose(callStream.getCredentials())); } } diff --git a/src/call-credentials.ts b/src/call-credentials.ts index 6e196823..4b121836 100644 --- a/src/call-credentials.ts +++ b/src/call-credentials.ts @@ -1,10 +1,10 @@ import { Metadata } from './metadata'; -import * as async from 'async'; +import {map, reduce} from 'lodash' export type CallMetadataGenerator = ( options: Object, cb: (err: Error | null, metadata?: Metadata) => void -) => void +) => void; /** * A class that represents a generic method of adding authentication-related @@ -14,17 +14,15 @@ export interface CallCredentials { /** * Asynchronously generates a new Metadata object. * @param options Options used in generating the Metadata object. - * @param cb A callback of the form (err, metadata) which will be called with - * either the generated metadata, or an error if one occurred. */ - generateMetadata: CallMetadataGenerator; + generateMetadata(options: Object): Promise; /** * Creates a new CallCredentials object from properties of both this and * another CallCredentials object. This object's metadata generator will be * called first. * @param callCredentials The other CallCredentials object. */ - compose: (callCredentials: CallCredentials) => CallCredentials; + compose(callCredentials: CallCredentials): CallCredentials; } export namespace CallCredentials { @@ -38,48 +36,60 @@ export namespace CallCredentials { export function createFromMetadataGenerator( metadataGenerator: CallMetadataGenerator ): CallCredentials { - return new CallCredentialsImpl([metadataGenerator]); + return new SingleCallCredentials(metadataGenerator); + } + + export function createEmpty(): CallCredentials { + return new EmptyCallCredentials(); } } +class ComposedCallCredentials implements CallCredentials { + constructor(private creds: CallCredentials[]) {} -class CallCredentialsImpl { - constructor(private metadataGenerators: Array) {} - - generateMetadata( - options: Object, - cb: (err: Error | null, metadata?: Metadata) => void - ): void { - if (this.metadataGenerators.length === 1) { - this.metadataGenerators[0](options, cb); - return; + async generateMetadata(options: Object): Promise { + let base: Metadata = new Metadata(); + let generated: Metadata[] = await Promise.all(map( + this.creds, (cred) => cred.generateMetadata(options))); + for (let gen of generated) { + base.merge(gen); } + return base; + } - const tasks: Array> = - this.metadataGenerators.map(fn => fn.bind(null, options)); - const callback: AsyncResultArrayCallback = - (err, metadataArray) => { - if (err || !metadataArray) { - cb(err || new Error('Unknown error')); - return; + compose(other: CallCredentials): CallCredentials { + return new ComposedCallCredentials(this.creds.concat([other])); + } +} + +class SingleCallCredentials implements CallCredentials{ + constructor(private metadataGenerator: CallMetadataGenerator) {} + + async generateMetadata(options: Object): Promise { + return new Promise((resolve, reject) => { + this.metadataGenerator(options, (err, metadata) => { + if (metadata !== undefined) { + resolve(metadata); } else { - const result: Metadata = new Metadata(); - metadataArray.forEach((metadata) => { - if (metadata) { - result.merge(metadata); - } - }); - cb(null, result); + reject(err); } - }; - async.parallel(tasks, callback); + }); + }); } - compose(callCredentials: CallCredentials): CallCredentials { - if (!(callCredentials instanceof CallCredentialsImpl)) { - throw new Error('Unknown CallCredentials implementation provided'); - } - return new CallCredentialsImpl(this.metadataGenerators.concat( - (callCredentials as CallCredentialsImpl).metadataGenerators)); + compose(other: CallCredentials): CallCredentials { + return new ComposedCallCredentials([this, other]); + } +} + +class EmptyCallCredentials implements CallCredentials { + constructor () {} + + async generateMetadata(options: Object): Promise { + return new Metadata(); + } + + compose(other:CallCredentials): CallCredentials { + return other; } } diff --git a/src/call-stream.ts b/src/call-stream.ts index f28d9789..5f788f79 100644 --- a/src/call-stream.ts +++ b/src/call-stream.ts @@ -1,5 +1,7 @@ import * as stream from 'stream'; +import * as http2 from 'http2'; + import {CallCredentials} from './call-credentials'; import {Status} from './constants'; import {Metadata} from './metadata'; @@ -7,13 +9,21 @@ import {ObjectDuplex} from './object-stream'; import {Filter} from './filter' import {FilterStackFactory} from './filter-stack' -export interface CallOptions { - deadline?: Date|number; - host?: string; - credentials?: CallCredentials; - flags?: number; +const { + HTTP2_HEADER_STATUS, + HTTP2_HEADER_CONTENT_TYPE +} = http2.constants; + +export type Deadline = Date | number; + +export interface CallStreamOptions { + deadline: Deadline; + credentials: CallCredentials; + flags: number; } +export type CallOptions = Partial; + export interface StatusObject { code: Status; details: string; @@ -32,6 +42,12 @@ export interface CallStream extends ObjectDuplex { cancelWithStatus(status: Status, details: string): void; getPeer(): string; + getDeadline(): Deadline; + getCredentials(): CallCredentials; + /* If the return value is null, the call has not ended yet. Otherwise, it has + * ended with the specified status */ + getStatus(): StatusObject | null; + addListener(event: string, listener: Function): this; emit(event: string|symbol, ...args: any[]): boolean; on(event: string, listener: Function): this; @@ -71,16 +87,16 @@ enum ReadState { export class Http2CallStream extends stream.Duplex implements CallStream { - private filterStack: Filter; - private statusEmitted: bool = false; - private http2Stream: ClientHttp2Stream | null = null; - private pendingRead: bool = false; + public filterStack: Filter; + private statusEmitted: boolean = false; + private http2Stream: http2.ClientHttp2Stream | null = null; + private pendingRead: boolean = false; private pendingWrite: Buffer | null = null; private pendingWriteCallback: Function | null = null; private pendingFinalCallback: Function | null = null; private readState: ReadState = ReadState.NO_DATA; - private readCompressFlag: bool = false; + private readCompressFlag: boolean = false; private readPartialSize: Buffer = Buffer.alloc(4); private readSizeRemaining: number = 4; private readMessageSize: number = 0; @@ -92,21 +108,26 @@ export class Http2CallStream extends stream.Duplex implements CallStream { // Status code mapped from :status. To be used if grpc-status is not received private mappedStatusCode: Status = Status.UNKNOWN; - constructor(public readonly methodName: string, public readonly options: CallOptions, + // This is populated (non-null) if and only if the call has ended + private finalStatus: StatusObject | null = null; + + constructor(private readonly methodName: string, + private readonly options: CallStreamOptions, filterStackFactory: FilterStackFactory) { - this.filterStack = FilterStackFactory.createFilter(this); + super({objectMode: true}); + this.filterStack = filterStackFactory.createFilter(this); } private endCall(status: StatusObject): void { - if (!this.statusEmitted) { - this.emit('status', {code: status, details: details, metadata: new Metadata()}); - this.statusEmitted = true; + if (!this.finalStatus === null) { + this.finalStatus = status; + this.emit('status', status); } } - attachHttp2Stream(stream: ClientHttp2Stream): void { - if (this.statusEmitted) { - // TODO(murgatroid99): Handle call end before http2 stream start + attachHttp2Stream(stream: http2.ClientHttp2Stream): void { + if (this.finalStatus !== null) { + stream.rstWithCancel(); } else { this.http2Stream = stream; stream.on('response', (headers) => { @@ -133,8 +154,8 @@ export class Http2CallStream extends stream.Duplex implements CallStream { default: this.mappedStatusCode = Status.UNKNOWN; } - delete headers[HTTP2_HEADERS_STATUS]; - delete headers[HTTP2_HEADERS_CONTENT_TYPE]; + delete headers[HTTP2_HEADER_STATUS]; + delete headers[HTTP2_HEADER_CONTENT_TYPE]; let metadata: Metadata; try { metadata = Metadata.fromHttp2Headers(headers); @@ -152,7 +173,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream { let code: Status = this.mappedStatusCode; if (headers.hasOwnProperty('grpc-status')) { let receivedCode = Number(headers['grpc-status']); - if (possibleCode in Status) { + if (receivedCode in Status) { code = receivedCode; } else { code = Status.UNKNOWN; @@ -169,23 +190,29 @@ export class Http2CallStream extends stream.Duplex implements CallStream { } catch (e) { metadata = new Metadata(); } + let status: StatusObject = { + code: code, + details: details, + metadata: metadata + }; this.filterStack.receiveTrailers(Promise.resolve(status)).then((finalStatus) => { this.endCall(finalStatus); }, (error) => { this.endCall({ code: Status.INTERNAL, details: 'Failed to process received status', - metadata: new Metadata(); + metadata: new Metadata() }); }); }); stream.on('read', (data) => { let readHead = 0; let canPush = true; + let toRead: number; while (readHead < data.length) { switch(this.readState) { case ReadState.NO_DATA: - readCompressFlag = (data.readUInt8(readHead) !== 0); + this.readCompressFlag = (data.readUInt8(readHead) !== 0); this.readState = ReadState.READING_SIZE; this.readPartialSize.fill(0); this.readSizeRemaining = 4; @@ -194,31 +221,31 @@ export class Http2CallStream extends stream.Duplex implements CallStream { this.readPartialMessage = []; break; case ReadState.READING_SIZE: - let toRead: number = Math.min(data.length - readHead, this.readSizeRemaining); - data.copy(readPartialSize, 4 - this.readSizeRemaining, readHead, readHead + toRead); + toRead = Math.min(data.length - readHead, this.readSizeRemaining); + data.copy(this.readPartialSize, 4 - this.readSizeRemaining, readHead, readHead + toRead); this.readSizeRemaining -= toRead; readHead += toRead; // readSizeRemaining >=0 here if (this.readSizeRemaining === 0) { - this.readMessageSize = readPartialSize.readUInt32BE(0); + this.readMessageSize = this.readPartialSize.readUInt32BE(0); this.readMessageRemaining = this.readMessageSize; this.readState = ReadState.READING_MESSAGE; } break; - case ReadSize.READING_MESSAGE: - let toRead: number = math.min(data.length - readHead, this.readMessageRemaining); - readPartialMessage.push(data.slice(readHead, readHead + toRead)); + case ReadState.READING_MESSAGE: + toRead = Math.min(data.length - readHead, this.readMessageRemaining); + this.readPartialMessage.push(data.slice(readHead, readHead + toRead)); this.readMessageRemaining -= toRead; - this.readHead += toRead; + readHead += toRead; // readMessageRemaining >=0 here if (this.readMessageRemaining === 0) { // At this point, we have read a full message - let messageBytes = Buffer.concat(readPartialMessage, readMessageSize); + let messageBytes = Buffer.concat(this.readPartialMessage, this.readMessageSize); // TODO(murgatroid99): Add receive message filters if (canPush) { if (!this.push(messageBytes)) { canPush = false; - this.http2Stream.pause(); + (this.http2Stream as http2.ClientHttp2Stream).pause(); } } else { this.unpushedReadMessages.push(messageBytes); @@ -234,6 +261,40 @@ export class Http2CallStream extends stream.Duplex implements CallStream { this.unpushedReadMessages.push(null); } }); + stream.on('streamClosed', (errorCode) => { + let code: Status; + let details: string = ''; + switch(errorCode) { + case http2.constants.NGHTTP2_REFUSED_STREAM: + code = Status.UNAVAILABLE; + break; + case http2.constants.NGHTTP2_CANCEL: + code = Status.CANCELLED; + break; + case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM: + code = Status.RESOURCE_EXHAUSTED; + details = 'Bandwidth exhausted'; + break; + case http2.constants.NGHTTP2_INADEQUATE_SECURITY: + code = Status.PERMISSION_DENIED; + details = 'Protocol not secure enough'; + break; + default: + code = Status.INTERNAL; + } + this.endCall({ + code: code, + details: details, + metadata: new Metadata() + }); + }); + stream.on('error', () => { + this.endCall({ + code: Status.INTERNAL, + details: 'Internal HTTP2 error', + metadata: new Metadata() + }); + }); } } @@ -246,6 +307,18 @@ export class Http2CallStream extends stream.Duplex implements CallStream { } } + getDeadline(): Deadline { + return this.options.deadline; + } + + getCredentials(): CallCredentials { + return this.options.credentials; + } + + getStatus(): StatusObject | null { + return this.finalStatus; + } + getPeer(): string { throw new Error('Not yet implemented'); } @@ -254,8 +327,8 @@ export class Http2CallStream extends stream.Duplex implements CallStream { if (this.http2Stream === null) { this.pendingRead = true; } else { - while (unpushedReadMessages.length > 0) { - let nextMessage = unpushedReadMessages.shift(); + while (this.unpushedReadMessages.length > 0) { + let nextMessage = this.unpushedReadMessages.shift(); let keepPushing = this.push(nextMessage); if (nextMessage === null || (!keepPushing)) { return; @@ -270,19 +343,19 @@ export class Http2CallStream extends stream.Duplex implements CallStream { // Encode a message to the wire format private encodeMessage(message: WriteObject): Buffer { - /* unsafeAlloc doesn't initiate the bytes in the buffer. We are explicitly + /* allocUnsafe doesn't initiate the bytes in the buffer. We are explicitly * overwriting every single byte, so that should be fine */ - let output: Buffer = Buffer.unsafeAlloc(message.length + 5); + let output: Buffer = Buffer.allocUnsafe(message.message.length + 5); // TODO(murgatroid99): handle compressed flag appropriately output.writeUInt8(0, 0); - output.writeUint32BE(message.message.length, 1); + output.writeUInt32BE(message.message.length, 1); message.message.copy(output, 5); return output; } _write(chunk: WriteObject, encoding: string, cb: Function) { // TODO(murgatroid99): Add send message filters - let encodedMessage = encodeMessage(chunk); + let encodedMessage = this.encodeMessage(chunk); if (this.http2Stream === null) { this.pendingWrite = encodedMessage; this.pendingWriteCallback = cb; diff --git a/src/channel-credentials.ts b/src/channel-credentials.ts index 975ff310..9a34fc0d 100644 --- a/src/channel-credentials.ts +++ b/src/channel-credentials.ts @@ -18,8 +18,8 @@ export interface ChannelCredentials { /** * Gets the set of per-call credentials associated with this instance. */ - getCallCredentials() : CallCredentials | null; - + getCallCredentials() : CallCredentials; + /** * Gets a SecureContext object generated from input parameters if this * instance was created with createSsl, or null if this instance was created @@ -62,15 +62,15 @@ export namespace ChannelCredentials { abstract class ChannelCredentialsImpl implements ChannelCredentials { - protected callCredentials: CallCredentials | null; + protected callCredentials: CallCredentials; protected constructor(callCredentials?: CallCredentials) { - this.callCredentials = callCredentials || null; + this.callCredentials = callCredentials || CallCredentials.createEmpty(); } abstract compose(callCredentials: CallCredentials) : ChannelCredentialsImpl; - getCallCredentials() : CallCredentials | null { + getCallCredentials() : CallCredentials { return this.callCredentials; } @@ -83,10 +83,7 @@ class InsecureChannelCredentialsImpl extends ChannelCredentialsImpl { } compose(callCredentials: CallCredentials) : ChannelCredentialsImpl { - const combinedCallCredentials = this.callCredentials ? - this.callCredentials.compose(callCredentials) : - callCredentials; - return new InsecureChannelCredentialsImpl(combinedCallCredentials); + throw new Error("Cannot compose insecure credentials"); } getSecureContext() : SecureContext | null { @@ -106,9 +103,8 @@ class SecureChannelCredentialsImpl extends ChannelCredentialsImpl { } compose(callCredentials: CallCredentials) : ChannelCredentialsImpl { - const combinedCallCredentials = this.callCredentials ? - this.callCredentials.compose(callCredentials) : - callCredentials; + const combinedCallCredentials = + this.callCredentials.compose(callCredentials); return new SecureChannelCredentialsImpl(this.secureContext, combinedCallCredentials); } diff --git a/src/channel.ts b/src/channel.ts index 9f6e1d4b..aa3b92fb 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -1,9 +1,9 @@ import {EventEmitter} from 'events'; import {SecureContext} from 'tls'; import * as http2 from 'http2'; -import {IncomingHttpHeaders, OutgoingHttpHeaders} from 'http'; import * as url from 'url'; -import {CallOptions, CallStream} from './call-stream'; +import {CallOptions, CallStreamOptions, CallStream, Http2CallStream} from './call-stream'; +import {CallCredentials} from './call-credentials'; import {ChannelCredentials} from './channel-credentials'; import {Metadata, MetadataObject} from './metadata'; import {Status} from './constants' @@ -11,12 +11,13 @@ import {Status} from './constants' import {FilterStackFactory} from './filter-stack' import {DeadlineFilterFactory} from './deadline-filter' import {CallCredentialsFilterFactory} from './call-credentials-filter' -import {Http2FilterFactory} from './http2-filter' +import {CompressionFilterFactory} from './compression-filter' const IDLE_TIMEOUT_MS = 300000; const { HTTP2_HEADER_AUTHORITY, + HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_SCHEME, @@ -44,8 +45,8 @@ export enum ConnectivityState { * by a given address. */ export interface Channel extends EventEmitter { - createStream(methodName: string, metadata: OutgoingHttp2Headers, options: CallOptions): CallStream; - connect(() => void): void; + createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream; + connect(callback: () => void): void; getConnectivityState(): ConnectivityState; close(): void; @@ -63,8 +64,7 @@ export class Http2Channel extends EventEmitter implements Channel { private idleTimerId: NodeJS.Timer | null = null; /* For now, we have up to one subchannel, which will exist as long as we are * connecting or trying to connect */ - private subChannel : Http2Session | null; - private address : url.Url; + private subChannel : http2.ClientHttp2Session | null; private filterStackFactory : FilterStackFactory; private transitionToState(newState: ConnectivityState): void { @@ -76,7 +76,12 @@ export class Http2Channel extends EventEmitter implements Channel { private startConnecting(): void { this.transitionToState(ConnectivityState.CONNECTING); - this.subChannel = http2.connect(address, { secureContext: this.secureContext }); + let secureContext = this.credentials.getSecureContext(); + if (secureContext === null) { + this.subChannel = http2.connect(this.address); + } else { + this.subChannel = http2.connect(this.address, {secureContext}); + } this.subChannel.on('connect', () => { this.transitionToState(ConnectivityState.READY); }); @@ -86,7 +91,10 @@ export class Http2Channel extends EventEmitter implements Channel { } private goIdle(): void { - this.subChannel.shutdown({graceful: true}); + if (this.subChannel !== null) { + this.subChannel.shutdown({graceful: true}, () => {}); + this.subChannel = null; + } this.transitionToState(ConnectivityState.IDLE); } @@ -96,10 +104,11 @@ export class Http2Channel extends EventEmitter implements Channel { } } - constructor(private readonly address: url.Url, + constructor(private readonly address: url.URL, public readonly credentials: ChannelCredentials, private readonly options: ChannelOptions) { - if (channelCredentials.getSecureContext() === null) { + super(); + if (credentials.getSecureContext() === null) { address.protocol = 'http'; } else { address.protocol = 'https'; @@ -111,11 +120,7 @@ export class Http2Channel extends EventEmitter implements Channel { ]); } - createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream { - if (this.connectivityState === ConnectivityState.SHUTDOWN) { - throw new Error('Channel has been shut down'); - } - let stream: Http2CallStream = new Http2CallStream(methodName, options, this.filterStackFactory); + private startHttp2Stream(methodName: string, stream: Http2CallStream, metadata: Metadata) { let finalMetadata: Promise = stream.filterStack.sendMetadata(Promise.resolve(metadata)); this.connect(() => { finalMetadata.then((metadataValue) => { @@ -125,13 +130,37 @@ export class Http2Channel extends EventEmitter implements Channel { headers[HTTP2_HEADER_METHOD] = 'POST'; headers[HTTP2_HEADER_PATH] = methodName; headers[HTTP2_HEADER_TE] = 'trailers'; - if (stream.isOpen()) { - stream.attachHttp2Stream(this.subchannel.request(headers)); + if (stream.getStatus() === null) { + if (this.connectivityState === ConnectivityState.READY) { + let session: http2.ClientHttp2Session = + (this.subChannel as http2.ClientHttp2Session); + stream.attachHttp2Stream(session.request(headers)); + } else { + /* In this case, we lost the connection while finalizing metadata. + * That should be very unusual */ + setImmediate(() => { + this.startHttp2Stream(methodName, stream, metadata); + }); + } } }, (error) => { stream.cancelWithStatus(Status.UNKNOWN, "Failed to generate metadata"); }); }); + } + + createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream { + if (this.connectivityState === ConnectivityState.SHUTDOWN) { + throw new Error('Channel has been shut down'); + } + let finalOptions: CallStreamOptions = { + deadline: options.deadline === undefined ? Infinity : options.deadline, + credentials: options.credentials === undefined ? + CallCredentials.createEmpty() : options.credentials, + flags: options.flags === undefined ? 0 : options.flags + } + let stream: Http2CallStream = new Http2CallStream(methodName, finalOptions, this.filterStackFactory); + this.startHttp2Stream(methodName, stream, metadata); return stream; } @@ -157,6 +186,8 @@ export class Http2Channel extends EventEmitter implements Channel { throw new Error('Channel has been shut down'); } this.transitionToState(ConnectivityState.SHUTDOWN); - this.subChannel.shutdown({graceful: true}); + if (this.subChannel !== null) { + this.subChannel.shutdown({graceful: true}); + } } } diff --git a/src/client.ts b/src/client.ts index e289284c..883efbee 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,6 +1,9 @@ +import {once} from 'lodash'; +import {URL} from 'url'; + import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError, ServiceErrorImpl} from './call'; import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream'; -import {Channel, ChannelOptions} from './channel'; +import {Channel, Http2Channel, ChannelOptions} from './channel'; import {ChannelCredentials} from './channel-credentials'; import {Status} from './constants'; import {Metadata} from './metadata'; @@ -21,7 +24,7 @@ export class Client { } // TODO(murgatroid99): Figure out how to get version number // options['grpc.primary_user_agent'] += 'grpc-node/' + version; - this.channel = new Channel(address, credentials, options); + this.channel = new Http2Channel(new URL(address), credentials, options); } close(): void { @@ -29,8 +32,27 @@ export class Client { } waitForReady(deadline: Date|number, callback: (error: Error|null) => void): - void { - throw new Error('waitForReady is not yet implemented'); + void { + let cb : (error: Error|null) => void = once(callback); + let callbackCalled: boolean = false; + this.channel.connect(() => { + cb(null); + }); + if (deadline != Infinity) { + let timeout: number; + let now: number = (new Date).getTime(); + if (deadline instanceof Date) { + timeout = deadline.getTime() - now; + } else { + timeout = deadline - now; + } + if (timeout < 0) { + timeout = 0; + } + setTimeout(() => { + cb(new Error('Failed to connect before the deadline')); + }, timeout); + } } private handleUnaryResponse( diff --git a/src/compression-filter.ts b/src/compression-filter.ts index 83d31692..b010f4c5 100644 --- a/src/compression-filter.ts +++ b/src/compression-filter.ts @@ -1,8 +1,12 @@ -import {Filter, BaseFilter} from './filter' +import {CallStream} from './call-stream' +import {Channel} from './channel' +import {Filter, BaseFilter, FilterFactory} from './filter' import {Metadata} from './metadata' export class CompressionFilter extends BaseFilter implements Filter { - constructor() {} + constructor() { + super(); + } async sendMetadata(metadata: Promise): Promise { let headers: Metadata = await metadata; @@ -11,7 +15,7 @@ export class CompressionFilter extends BaseFilter implements Filter { return headers; } - async receiveMetadata(metadata: Promise): Promise): Promise { let headers: Metadata = await metadata; headers.remove('grpc-encoding'); headers.remove('grpc-accept-encoding'); @@ -19,8 +23,8 @@ export class CompressionFilter extends BaseFilter implements Filter { } } -export class CompressionFilterFactory { - constructor(channel) {} +export class CompressionFilterFactory implements FilterFactory { + constructor(channel: Channel) {} createFilter(callStream: CallStream): CompressionFilter { return new CompressionFilter(); } diff --git a/src/deadline-filter.ts b/src/deadline-filter.ts index b0dc378e..5796162e 100644 --- a/src/deadline-filter.ts +++ b/src/deadline-filter.ts @@ -1,7 +1,10 @@ -import {Filter} from './filter' +import {CallStream} from './call-stream' +import {Channel, Http2Channel} from './channel' +import {Filter, BaseFilter, FilterFactory} from './filter' import {Status} from './constants' +import {Metadata} from './metadata' -const units = [ +const units: [string, number][] = [ ['m', 1], ['S', 1000], ['M', 60 * 1000], @@ -9,16 +12,21 @@ const units = [ ] export class DeadlineFilter extends BaseFilter implements Filter { - private deadline; - constructor(private readonly channel: Channel, private readonly callStream: CallStream) { - let deadline = callStream.deadline; - this.deadline = deadline; + private deadline: number; + constructor(private readonly channel: Http2Channel, private readonly callStream: CallStream) { + super(); + let callDeadline = callStream.getDeadline(); + if (callDeadline instanceof Date) { + this.deadline = callDeadline.getTime(); + } else { + this.deadline = callDeadline; + } let now: number = (new Date()).getTime(); - let timeout = deadline - now; + let timeout = this.deadline - now; if (timeout < 0) { timeout = 0; } - if (deadline !== Infinity) { + if (this.deadline !== Infinity) { setTimeout(() => { callStream.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded'); }, timeout); @@ -42,7 +50,9 @@ export class DeadlineFilter extends BaseFilter implements Filter { } }); }); - (await metadata).set('grpc-timeout', await timeoutString); + let finalMetadata = await metadata; + finalMetadata.set('grpc-timeout', await timeoutString); + return finalMetadata; } } diff --git a/src/filter-stack.ts b/src/filter-stack.ts index 28e7dd99..2a402c3d 100644 --- a/src/filter-stack.ts +++ b/src/filter-stack.ts @@ -1,26 +1,28 @@ -import {flow, map} from 'lodash'; -import {Filter} from './filter' +import {flow, flowRight, map} from 'lodash'; +import {Metadata} from './metadata'; +import {CallStream, StatusObject} from './call-stream' +import {Filter, FilterFactory} from './filter'; export class FilterStack implements Filter { constructor(private readonly filters: Filter[]) {} async sendMetadata(metadata: Promise) { - return await flow(map(filters, (filter) => filter.sendMetadata.bind(filter)))(metadata); + return await flow(map(this.filters, (filter) => filter.sendMetadata.bind(filter)))(metadata); } async receiveMetadata(metadata: Promise) { - return await flowRight(map(filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata); + return await flowRight(map(this.filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata); } async receiveTrailers(status: Promise): Promise { - return await flowRight(map(filters, (filter) => filter.receiveTrailers.bind(filter)))(status); + return await flowRight(map(this.filters, (filter) => filter.receiveTrailers.bind(filter)))(status); } } export class FilterStackFactory implements FilterFactory { - constructor(private readonly factories: FilterFactory[]) {} + constructor(private readonly factories: FilterFactory[]) {} createFilter(callStream: CallStream): FilterStack { - return new FilterStack(map(factories, (factory) => factory.createFilter(callStream))); + return new FilterStack(map(this.factories, (factory) => factory.createFilter(callStream))); } } diff --git a/src/filter.ts b/src/filter.ts index 7d6a7a29..058c45c2 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,12 +1,12 @@ import {Metadata} from './metadata' -import {WriteObject, CallStream} from './call-stream' +import {StatusObject, CallStream} from './call-stream' export interface Filter { - async sendMetadata(metadata: Promise): Promise; + sendMetadata(metadata: Promise): Promise; - async receiveMetadata(metadata: Promise): Promise; + receiveMetadata(metadata: Promise): Promise; - async receiveTrailers(status: Promise): Promise; + receiveTrailers(status: Promise): Promise; } export abstract class BaseFilter { diff --git a/src/metadata.ts b/src/metadata.ts index e9e71208..979d1230 100644 --- a/src/metadata.ts +++ b/src/metadata.ts @@ -65,7 +65,8 @@ function validate(key: string, value?: MetadataValue): void { * A class for storing metadata. Keys are normalized to lowercase ASCII. */ export class Metadata { - constructor(protected readonly internalRepr: MetadataObject = {}) {} + private internalRepr: MetadataObject; + constructor() {} /** * Sets the given value for the given key by replacing any other values @@ -145,7 +146,9 @@ export class Metadata { * @return The newly cloned object. */ clone(): Metadata { - return new Metadata(cloneMetadataObject(this.internalRepr)); + let newMetadata = new Metadata(); + newMetadata.internalRepr = cloneMetadataObject(this.internalRepr); + return newMetadata; } /**