From 4328808943923d3ec2b0cc08036d6ee80417cbee Mon Sep 17 00:00:00 2001
From: murgatroid99 <mlumish@google.com>
Date: Thu, 24 Aug 2017 18:01:40 -0700
Subject: [PATCH] Finished initial Channel and CallStream implementations

---
 package.json                   |  15 ++--
 src/call-credentials-filter.ts |  24 +++---
 src/call-credentials.ts        |  88 ++++++++++---------
 src/call-stream.ts             | 149 ++++++++++++++++++++++++---------
 src/channel-credentials.ts     |  20 ++---
 src/channel.ts                 |  69 ++++++++++-----
 src/client.ts                  |  30 ++++++-
 src/compression-filter.ts      |  14 ++--
 src/deadline-filter.ts         |  28 +++++--
 src/filter-stack.ts            |  16 ++--
 src/filter.ts                  |   8 +-
 src/metadata.ts                |   7 +-
 12 files changed, 310 insertions(+), 158 deletions(-)

diff --git a/package.json b/package.json
index e68c401b..ad72cd67 100644
--- a/package.json
+++ b/package.json
@@ -14,24 +14,23 @@
   },
   "license": "Apache-2.0",
   "devDependencies": {
-    "@types/mocha": "^2.2.41",
-    "@types/node": "^8.0.19",
+    "@types/mocha": "^2.2.42",
+    "@types/node": "^8.0.25",
     "clang-format": "^1.0.53",
     "del": "^3.0.0",
-    "google-ts-style": "latest",
+    "google-ts-style": "^0.2.0",
     "gulp": "^3.9.1",
     "gulp-help": "^1.6.1",
     "gulp-mocha": "^4.3.1",
-    "gulp-sourcemaps": "^2.6.0",
+    "gulp-sourcemaps": "^2.6.1",
     "gulp-tslint": "^8.1.1",
-    "gulp-typescript": "^3.2.1",
+    "gulp-typescript": "^3.2.2",
     "gulp-util": "^3.0.8",
-    "h2-types": "git+https://github.com/kjin/node-h2-types.git",
     "merge2": "^1.1.0",
     "mocha": "^3.5.0",
     "through2": "^2.0.3",
     "tslint": "^5.5.0",
-    "typescript": "^2.4.1"
+    "typescript": "^2.5.1"
   },
   "contributors": [
     {
@@ -48,9 +47,7 @@
     "test": "gulp test"
   },
   "dependencies": {
-    "@types/async": "^2.0.41",
     "@types/lodash": "^4.14.73",
-    "async": "^2.5.0",
     "lodash": "^4.17.4"
   }
 }
diff --git a/src/call-credentials-filter.ts b/src/call-credentials-filter.ts
index 7b10072d..a508e846 100644
--- a/src/call-credentials-filter.ts
+++ b/src/call-credentials-filter.ts
@@ -1,28 +1,32 @@
 import {promisify} from 'util'
-import {Filter} from './filter'
+import {Filter, BaseFilter, FilterFactory} from './filter'
 import {CallCredentials} from './call-credentials'
+import {Http2Channel} from './channel'
+import {CallStream} from './call-stream'
+import {Metadata} from './metadata'
 
 export class CallCredentialsFilter extends BaseFilter implements Filter {
 
-  private credsMetadata: Promise<Metadata>;
-
-  constructor(credentials: CallCredentials) {
-    // TODO(murgatroid99): pass real options to generateMetadata
-    credsMetadata = util.promisify(credentials.generateMetadata.bind(credentials))({});
+  constructor(private readonly credentials: CallCredentials) {
+    super();
   }
 
-  async sendMetadata(metadata: Promise<Metadata>) {
-    return (await metadata).merge(await this.credsMetadata);
+  async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
+    // TODO(murgatroid99): pass real options to generateMetadata
+    let credsMetadata = this.credentials.generateMetadata.bind({});
+    let resultMetadata = await metadata;
+    resultMetadata.merge(await credsMetadata);
+    return resultMetadata;
   }
 }
 
 export class CallCredentialsFilterFactory implements FilterFactory<CallCredentialsFilter> {
-  private credentials: CallCredentials | null;
+  private readonly credentials: CallCredentials;
   constructor(channel: Http2Channel) {
     this.credentials = channel.credentials.getCallCredentials();
   }
 
   createFilter(callStream: CallStream): CallCredentialsFilter {
-    return new CallCredentialsFilter(this.credentials.compose(callStream.credentials));
+    return new CallCredentialsFilter(this.credentials.compose(callStream.getCredentials()));
   }
 }
diff --git a/src/call-credentials.ts b/src/call-credentials.ts
index 6e196823..4b121836 100644
--- a/src/call-credentials.ts
+++ b/src/call-credentials.ts
@@ -1,10 +1,10 @@
 import { Metadata } from './metadata';
-import * as async from 'async';
+import {map, reduce} from 'lodash'
 
 export type CallMetadataGenerator = (
   options: Object,
   cb: (err: Error | null, metadata?: Metadata) => void
-) => void
+) => void;
 
 /**
  * A class that represents a generic method of adding authentication-related
@@ -14,17 +14,15 @@ export interface CallCredentials {
   /**
    * Asynchronously generates a new Metadata object.
    * @param options Options used in generating the Metadata object.
-   * @param cb A callback of the form (err, metadata) which will be called with
-   * either the generated metadata, or an error if one occurred.
    */
-  generateMetadata: CallMetadataGenerator;
+  generateMetadata(options: Object): Promise<Metadata>;
   /**
    * Creates a new CallCredentials object from properties of both this and
    * another CallCredentials object. This object's metadata generator will be
    * called first.
    * @param callCredentials The other CallCredentials object.
    */
-  compose: (callCredentials: CallCredentials) => CallCredentials;
+  compose(callCredentials: CallCredentials): CallCredentials;
 }
 
 export namespace CallCredentials {
@@ -38,48 +36,60 @@ export namespace CallCredentials {
   export function createFromMetadataGenerator(
     metadataGenerator: CallMetadataGenerator
   ): CallCredentials {
-    return new CallCredentialsImpl([metadataGenerator]);
+    return new SingleCallCredentials(metadataGenerator);
+  }
+
+  export function createEmpty(): CallCredentials {
+    return new EmptyCallCredentials();
   }
 }
 
+class ComposedCallCredentials implements CallCredentials {
+  constructor(private creds: CallCredentials[]) {}
 
-class CallCredentialsImpl {
-  constructor(private metadataGenerators: Array<CallMetadataGenerator>) {}
-
-  generateMetadata(
-    options: Object,
-    cb: (err: Error | null, metadata?: Metadata) => void
-  ): void {
-    if (this.metadataGenerators.length === 1) {
-      this.metadataGenerators[0](options, cb);
-      return;
+  async generateMetadata(options: Object): Promise<Metadata> {
+    let base: Metadata = new Metadata();
+    let generated: Metadata[] = await Promise.all(map(
+      this.creds, (cred) => cred.generateMetadata(options)));
+    for (let gen of generated) {
+      base.merge(gen);
     }
+    return base;
+  }
 
-    const tasks: Array<AsyncFunction<Metadata, Error>> =
-      this.metadataGenerators.map(fn => fn.bind(null, options));
-    const callback: AsyncResultArrayCallback<Metadata, Error> =
-      (err, metadataArray) => {
-        if (err || !metadataArray) {
-          cb(err || new Error('Unknown error'));
-          return;
+  compose(other: CallCredentials): CallCredentials {
+    return new ComposedCallCredentials(this.creds.concat([other]));
+  }
+}
+
+class SingleCallCredentials implements CallCredentials{
+  constructor(private metadataGenerator: CallMetadataGenerator) {}
+
+  async generateMetadata(options: Object): Promise<Metadata> {
+    return new Promise<Metadata>((resolve, reject) => {
+      this.metadataGenerator(options, (err, metadata) => {
+        if (metadata !== undefined) {
+          resolve(metadata);
         } else {
-          const result: Metadata = new Metadata();
-          metadataArray.forEach((metadata) => {
-            if (metadata) {
-              result.merge(metadata);
-            }
-          });
-          cb(null, result);
+          reject(err);
         }
-      };
-    async.parallel(tasks, callback);
+      });
+    });
   }
 
-  compose(callCredentials: CallCredentials): CallCredentials {
-    if (!(callCredentials instanceof CallCredentialsImpl)) {
-      throw new Error('Unknown CallCredentials implementation provided');
-    }
-    return new CallCredentialsImpl(this.metadataGenerators.concat(
-      (callCredentials as CallCredentialsImpl).metadataGenerators));
+  compose(other: CallCredentials): CallCredentials {
+    return new ComposedCallCredentials([this, other]);
+  }
+}
+
+class EmptyCallCredentials implements CallCredentials {
+  constructor () {}
+
+  async generateMetadata(options: Object): Promise<Metadata> {
+    return new Metadata();
+  }
+
+  compose(other:CallCredentials): CallCredentials {
+    return other;
   }
 }
diff --git a/src/call-stream.ts b/src/call-stream.ts
index f28d9789..5f788f79 100644
--- a/src/call-stream.ts
+++ b/src/call-stream.ts
@@ -1,5 +1,7 @@
 import * as stream from 'stream';
 
+import * as http2  from 'http2';
+
 import {CallCredentials} from './call-credentials';
 import {Status} from './constants';
 import {Metadata} from './metadata';
@@ -7,13 +9,21 @@ import {ObjectDuplex} from './object-stream';
 import {Filter} from './filter'
 import {FilterStackFactory} from './filter-stack'
 
-export interface CallOptions {
-  deadline?: Date|number;
-  host?: string;
-  credentials?: CallCredentials;
-  flags?: number;
+const {
+  HTTP2_HEADER_STATUS,
+  HTTP2_HEADER_CONTENT_TYPE
+} = http2.constants;
+
+export type Deadline = Date | number;
+
+export interface CallStreamOptions {
+  deadline: Deadline;
+  credentials: CallCredentials;
+  flags: number;
 }
 
+export type CallOptions = Partial<CallStreamOptions>;
+
 export interface StatusObject {
   code: Status;
   details: string;
@@ -32,6 +42,12 @@ export interface CallStream extends ObjectDuplex<WriteObject, Buffer> {
   cancelWithStatus(status: Status, details: string): void;
   getPeer(): string;
 
+  getDeadline(): Deadline;
+  getCredentials(): CallCredentials;
+  /* If the return value is null, the call has not ended yet. Otherwise, it has
+   * ended with the specified status */
+  getStatus(): StatusObject | null;
+
   addListener(event: string, listener: Function): this;
   emit(event: string|symbol, ...args: any[]): boolean;
   on(event: string, listener: Function): this;
@@ -71,16 +87,16 @@ enum ReadState {
 
 export class Http2CallStream extends stream.Duplex implements CallStream {
 
-  private filterStack: Filter;
-  private statusEmitted: bool = false;
-  private http2Stream: ClientHttp2Stream | null = null;
-  private pendingRead: bool = false;
+  public filterStack: Filter;
+  private statusEmitted: boolean = false;
+  private http2Stream: http2.ClientHttp2Stream | null = null;
+  private pendingRead: boolean = false;
   private pendingWrite: Buffer | null = null;
   private pendingWriteCallback: Function | null = null;
   private pendingFinalCallback: Function | null = null;
 
   private readState: ReadState = ReadState.NO_DATA;
-  private readCompressFlag: bool = false;
+  private readCompressFlag: boolean = false;
   private readPartialSize: Buffer = Buffer.alloc(4);
   private readSizeRemaining: number = 4;
   private readMessageSize: number = 0;
@@ -92,21 +108,26 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
   // Status code mapped from :status. To be used if grpc-status is not received
   private mappedStatusCode: Status = Status.UNKNOWN;
 
-  constructor(public readonly methodName: string, public readonly options: CallOptions,
+  // This is populated (non-null) if and only if the call has ended
+  private finalStatus: StatusObject | null = null;
+
+  constructor(private readonly methodName: string,
+              private readonly options: CallStreamOptions,
               filterStackFactory: FilterStackFactory) {
-    this.filterStack = FilterStackFactory.createFilter(this);
+    super({objectMode: true});
+    this.filterStack = filterStackFactory.createFilter(this);
   }
 
   private endCall(status: StatusObject): void {
-    if (!this.statusEmitted) {
-      this.emit('status', {code: status, details: details, metadata: new Metadata()});
-      this.statusEmitted = true;
+    if (!this.finalStatus === null) {
+      this.finalStatus = status;
+      this.emit('status', status);
     }
   }
 
-  attachHttp2Stream(stream: ClientHttp2Stream): void {
-    if (this.statusEmitted) {
-      // TODO(murgatroid99): Handle call end before http2 stream start
+  attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
+    if (this.finalStatus !== null) {
+      stream.rstWithCancel();
     } else {
       this.http2Stream = stream;
       stream.on('response', (headers) => {
@@ -133,8 +154,8 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
         default:
           this.mappedStatusCode = Status.UNKNOWN;
         }
-        delete headers[HTTP2_HEADERS_STATUS];
-        delete headers[HTTP2_HEADERS_CONTENT_TYPE];
+        delete headers[HTTP2_HEADER_STATUS];
+        delete headers[HTTP2_HEADER_CONTENT_TYPE];
         let metadata: Metadata;
         try {
           metadata = Metadata.fromHttp2Headers(headers);
@@ -152,7 +173,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
         let code: Status = this.mappedStatusCode;
         if (headers.hasOwnProperty('grpc-status')) {
           let receivedCode = Number(headers['grpc-status']);
-          if (possibleCode in Status) {
+          if (receivedCode in Status) {
             code = receivedCode;
           } else {
             code = Status.UNKNOWN;
@@ -169,23 +190,29 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
         } catch (e) {
           metadata = new Metadata();
         }
+        let status: StatusObject = {
+          code: code,
+          details: details,
+          metadata: metadata
+        };
         this.filterStack.receiveTrailers(Promise.resolve(status)).then((finalStatus) => {
           this.endCall(finalStatus);
         }, (error) => {
           this.endCall({
             code: Status.INTERNAL,
             details: 'Failed to process received status',
-            metadata: new Metadata();
+            metadata: new Metadata()
           });
         });
       });
       stream.on('read', (data) => {
         let readHead = 0;
         let canPush = true;
+        let toRead: number;
         while (readHead < data.length) {
           switch(this.readState) {
           case ReadState.NO_DATA:
-            readCompressFlag = (data.readUInt8(readHead) !== 0);
+            this.readCompressFlag = (data.readUInt8(readHead) !== 0);
             this.readState = ReadState.READING_SIZE;
             this.readPartialSize.fill(0);
             this.readSizeRemaining = 4;
@@ -194,31 +221,31 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
             this.readPartialMessage = [];
             break;
           case ReadState.READING_SIZE:
-            let toRead: number = Math.min(data.length - readHead, this.readSizeRemaining);
-            data.copy(readPartialSize, 4 - this.readSizeRemaining, readHead, readHead + toRead);
+            toRead = Math.min(data.length - readHead, this.readSizeRemaining);
+            data.copy(this.readPartialSize, 4 - this.readSizeRemaining, readHead, readHead + toRead);
             this.readSizeRemaining -= toRead;
             readHead += toRead;
             // readSizeRemaining >=0 here
             if (this.readSizeRemaining === 0) {
-              this.readMessageSize = readPartialSize.readUInt32BE(0);
+              this.readMessageSize = this.readPartialSize.readUInt32BE(0);
               this.readMessageRemaining = this.readMessageSize;
               this.readState = ReadState.READING_MESSAGE;
             }
             break;
-          case ReadSize.READING_MESSAGE:
-            let toRead: number = math.min(data.length - readHead, this.readMessageRemaining);
-            readPartialMessage.push(data.slice(readHead, readHead + toRead));
+          case ReadState.READING_MESSAGE:
+            toRead = Math.min(data.length - readHead, this.readMessageRemaining);
+            this.readPartialMessage.push(data.slice(readHead, readHead + toRead));
             this.readMessageRemaining -= toRead;
-            this.readHead += toRead;
+            readHead += toRead;
             // readMessageRemaining >=0 here
             if (this.readMessageRemaining === 0) {
               // At this point, we have read a full message
-              let messageBytes = Buffer.concat(readPartialMessage, readMessageSize);
+              let messageBytes = Buffer.concat(this.readPartialMessage, this.readMessageSize);
               // TODO(murgatroid99): Add receive message filters
               if (canPush) {
                 if (!this.push(messageBytes)) {
                   canPush = false;
-                  this.http2Stream.pause();
+                  (this.http2Stream as http2.ClientHttp2Stream).pause();
                 }
               } else {
                 this.unpushedReadMessages.push(messageBytes);
@@ -234,6 +261,40 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
           this.unpushedReadMessages.push(null);
         }
       });
+      stream.on('streamClosed', (errorCode) => {
+        let code: Status;
+        let details: string = '';
+        switch(errorCode) {
+        case http2.constants.NGHTTP2_REFUSED_STREAM:
+          code = Status.UNAVAILABLE;
+          break;
+        case http2.constants.NGHTTP2_CANCEL:
+          code = Status.CANCELLED;
+          break;
+        case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
+          code = Status.RESOURCE_EXHAUSTED;
+          details = 'Bandwidth exhausted';
+          break;
+        case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
+          code = Status.PERMISSION_DENIED;
+          details = 'Protocol not secure enough';
+          break;
+        default:
+          code = Status.INTERNAL;
+        }
+        this.endCall({
+          code: code,
+          details: details,
+          metadata: new Metadata()
+        });
+      });
+      stream.on('error', () => {
+        this.endCall({
+          code: Status.INTERNAL,
+          details: 'Internal HTTP2 error',
+          metadata: new Metadata()
+        });
+      });
     }
   }
 
@@ -246,6 +307,18 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
     }
   }
 
+  getDeadline(): Deadline {
+    return this.options.deadline;
+  }
+
+  getCredentials(): CallCredentials {
+    return this.options.credentials;
+  }
+
+  getStatus(): StatusObject | null {
+    return this.finalStatus;
+  }
+
   getPeer(): string {
     throw new Error('Not yet implemented');
   }
@@ -254,8 +327,8 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
     if (this.http2Stream === null) {
       this.pendingRead = true;
     } else {
-      while (unpushedReadMessages.length > 0) {
-        let nextMessage = unpushedReadMessages.shift();
+      while (this.unpushedReadMessages.length > 0) {
+        let nextMessage = this.unpushedReadMessages.shift();
         let keepPushing = this.push(nextMessage);
         if (nextMessage === null || (!keepPushing)) {
           return;
@@ -270,19 +343,19 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
 
   // Encode a message to the wire format
   private encodeMessage(message: WriteObject): Buffer {
-    /* unsafeAlloc doesn't initiate the bytes in the buffer. We are explicitly
+    /* allocUnsafe doesn't initiate the bytes in the buffer. We are explicitly
      * overwriting every single byte, so that should be fine */
-    let output: Buffer = Buffer.unsafeAlloc(message.length + 5);
+    let output: Buffer = Buffer.allocUnsafe(message.message.length + 5);
     // TODO(murgatroid99): handle compressed flag appropriately
     output.writeUInt8(0, 0);
-    output.writeUint32BE(message.message.length, 1);
+    output.writeUInt32BE(message.message.length, 1);
     message.message.copy(output, 5);
     return output;
   }
 
   _write(chunk: WriteObject, encoding: string, cb: Function) {
     // TODO(murgatroid99): Add send message filters
-    let encodedMessage = encodeMessage(chunk);
+    let encodedMessage = this.encodeMessage(chunk);
     if (this.http2Stream === null) {
       this.pendingWrite = encodedMessage;
       this.pendingWriteCallback = cb;
diff --git a/src/channel-credentials.ts b/src/channel-credentials.ts
index 975ff310..9a34fc0d 100644
--- a/src/channel-credentials.ts
+++ b/src/channel-credentials.ts
@@ -18,8 +18,8 @@ export interface ChannelCredentials {
   /**
    * Gets the set of per-call credentials associated with this instance.
    */
-  getCallCredentials() : CallCredentials | null;
-  
+  getCallCredentials() : CallCredentials;
+
   /**
    * Gets a SecureContext object generated from input parameters if this
    * instance was created with createSsl, or null if this instance was created
@@ -62,15 +62,15 @@ export namespace ChannelCredentials {
 
 
 abstract class ChannelCredentialsImpl implements ChannelCredentials {
-  protected callCredentials: CallCredentials | null;
+  protected callCredentials: CallCredentials;
 
   protected constructor(callCredentials?: CallCredentials) {
-    this.callCredentials = callCredentials || null;
+    this.callCredentials = callCredentials || CallCredentials.createEmpty();
   }
 
   abstract compose(callCredentials: CallCredentials) : ChannelCredentialsImpl;
 
-  getCallCredentials() : CallCredentials | null {
+  getCallCredentials() : CallCredentials {
     return this.callCredentials;
   }
 
@@ -83,10 +83,7 @@ class InsecureChannelCredentialsImpl extends ChannelCredentialsImpl {
   }
 
   compose(callCredentials: CallCredentials) : ChannelCredentialsImpl {
-    const combinedCallCredentials = this.callCredentials ?
-      this.callCredentials.compose(callCredentials) :
-      callCredentials;
-    return new InsecureChannelCredentialsImpl(combinedCallCredentials);
+    throw new Error("Cannot compose insecure credentials");
   }
 
   getSecureContext() : SecureContext | null {
@@ -106,9 +103,8 @@ class SecureChannelCredentialsImpl extends ChannelCredentialsImpl {
   }
 
   compose(callCredentials: CallCredentials) : ChannelCredentialsImpl {
-    const combinedCallCredentials = this.callCredentials ?
-      this.callCredentials.compose(callCredentials) :
-      callCredentials;
+    const combinedCallCredentials =
+      this.callCredentials.compose(callCredentials);
     return new SecureChannelCredentialsImpl(this.secureContext,
       combinedCallCredentials);
   }
diff --git a/src/channel.ts b/src/channel.ts
index 9f6e1d4b..aa3b92fb 100644
--- a/src/channel.ts
+++ b/src/channel.ts
@@ -1,9 +1,9 @@
 import {EventEmitter} from 'events';
 import {SecureContext} from 'tls';
 import * as http2 from 'http2';
-import {IncomingHttpHeaders, OutgoingHttpHeaders} from 'http';
 import * as url from 'url';
-import {CallOptions, CallStream} from './call-stream';
+import {CallOptions, CallStreamOptions, CallStream, Http2CallStream} from './call-stream';
+import {CallCredentials} from './call-credentials';
 import {ChannelCredentials} from './channel-credentials';
 import {Metadata, MetadataObject} from './metadata';
 import {Status} from './constants'
@@ -11,12 +11,13 @@ import {Status} from './constants'
 import {FilterStackFactory} from './filter-stack'
 import {DeadlineFilterFactory} from './deadline-filter'
 import {CallCredentialsFilterFactory} from './call-credentials-filter'
-import {Http2FilterFactory} from './http2-filter'
+import {CompressionFilterFactory} from './compression-filter'
 
 const IDLE_TIMEOUT_MS = 300000;
 
 const {
   HTTP2_HEADER_AUTHORITY,
+  HTTP2_HEADER_CONTENT_TYPE,
   HTTP2_HEADER_METHOD,
   HTTP2_HEADER_PATH,
   HTTP2_HEADER_SCHEME,
@@ -44,8 +45,8 @@ export enum ConnectivityState {
  * by a given address.
  */
 export interface Channel extends EventEmitter {
-  createStream(methodName: string, metadata: OutgoingHttp2Headers, options: CallOptions): CallStream;
-  connect(() => void): void;
+  createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream;
+  connect(callback: () => void): void;
   getConnectivityState(): ConnectivityState;
   close(): void;
 
@@ -63,8 +64,7 @@ export class Http2Channel extends EventEmitter implements Channel {
   private idleTimerId: NodeJS.Timer | null = null;
   /* For now, we have up to one subchannel, which will exist as long as we are
    * connecting or trying to connect */
-  private subChannel : Http2Session | null;
-  private address : url.Url;
+  private subChannel : http2.ClientHttp2Session | null;
   private filterStackFactory : FilterStackFactory;
 
   private transitionToState(newState: ConnectivityState): void {
@@ -76,7 +76,12 @@ export class Http2Channel extends EventEmitter implements Channel {
 
   private startConnecting(): void {
     this.transitionToState(ConnectivityState.CONNECTING);
-    this.subChannel = http2.connect(address, { secureContext: this.secureContext });
+    let secureContext = this.credentials.getSecureContext();
+    if (secureContext === null) {
+      this.subChannel = http2.connect(this.address);
+    } else {
+      this.subChannel = http2.connect(this.address, {secureContext});
+    }
     this.subChannel.on('connect', () => {
       this.transitionToState(ConnectivityState.READY);
     });
@@ -86,7 +91,10 @@ export class Http2Channel extends EventEmitter implements Channel {
   }
 
   private goIdle(): void {
-    this.subChannel.shutdown({graceful: true});
+    if (this.subChannel !== null) {
+      this.subChannel.shutdown({graceful: true}, () => {});
+      this.subChannel = null;
+    }
     this.transitionToState(ConnectivityState.IDLE);
   }
 
@@ -96,10 +104,11 @@ export class Http2Channel extends EventEmitter implements Channel {
     }
   }
 
-  constructor(private readonly address: url.Url,
+  constructor(private readonly address: url.URL,
               public readonly credentials: ChannelCredentials,
               private readonly options: ChannelOptions) {
-    if (channelCredentials.getSecureContext() === null) {
+    super();
+    if (credentials.getSecureContext() === null) {
       address.protocol = 'http';
     } else {
       address.protocol = 'https';
@@ -111,11 +120,7 @@ export class Http2Channel extends EventEmitter implements Channel {
     ]);
   }
 
-  createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream {
-    if (this.connectivityState === ConnectivityState.SHUTDOWN) {
-      throw new Error('Channel has been shut down');
-    }
-    let stream: Http2CallStream = new Http2CallStream(methodName, options, this.filterStackFactory);
+  private startHttp2Stream(methodName: string, stream: Http2CallStream, metadata: Metadata) {
     let finalMetadata: Promise<Metadata> = stream.filterStack.sendMetadata(Promise.resolve(metadata));
     this.connect(() => {
       finalMetadata.then((metadataValue) => {
@@ -125,13 +130,37 @@ export class Http2Channel extends EventEmitter implements Channel {
         headers[HTTP2_HEADER_METHOD] = 'POST';
         headers[HTTP2_HEADER_PATH] = methodName;
         headers[HTTP2_HEADER_TE] = 'trailers';
-        if (stream.isOpen()) {
-          stream.attachHttp2Stream(this.subchannel.request(headers));
+        if (stream.getStatus() === null) {
+          if (this.connectivityState === ConnectivityState.READY) {
+            let session: http2.ClientHttp2Session =
+              (this.subChannel as http2.ClientHttp2Session);
+            stream.attachHttp2Stream(session.request(headers));
+          } else {
+            /* In this case, we lost the connection while finalizing metadata.
+             * That should be very unusual */
+            setImmediate(() => {
+              this.startHttp2Stream(methodName, stream, metadata);
+            });
+          }
         }
       }, (error) => {
         stream.cancelWithStatus(Status.UNKNOWN, "Failed to generate metadata");
       });
     });
+  }
+
+  createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream {
+    if (this.connectivityState === ConnectivityState.SHUTDOWN) {
+      throw new Error('Channel has been shut down');
+    }
+    let finalOptions: CallStreamOptions = {
+      deadline: options.deadline === undefined ? Infinity : options.deadline,
+      credentials: options.credentials === undefined ?
+        CallCredentials.createEmpty() : options.credentials,
+      flags: options.flags === undefined ? 0 : options.flags
+    }
+    let stream: Http2CallStream = new Http2CallStream(methodName, finalOptions, this.filterStackFactory);
+    this.startHttp2Stream(methodName, stream, metadata);
     return stream;
   }
 
@@ -157,6 +186,8 @@ export class Http2Channel extends EventEmitter implements Channel {
       throw new Error('Channel has been shut down');
     }
     this.transitionToState(ConnectivityState.SHUTDOWN);
-    this.subChannel.shutdown({graceful: true});
+    if (this.subChannel !== null) {
+      this.subChannel.shutdown({graceful: true});
+    }
   }
 }
diff --git a/src/client.ts b/src/client.ts
index e289284c..883efbee 100644
--- a/src/client.ts
+++ b/src/client.ts
@@ -1,6 +1,9 @@
+import {once} from 'lodash';
+import {URL} from 'url';
+
 import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError, ServiceErrorImpl} from './call';
 import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream';
-import {Channel, ChannelOptions} from './channel';
+import {Channel, Http2Channel, ChannelOptions} from './channel';
 import {ChannelCredentials} from './channel-credentials';
 import {Status} from './constants';
 import {Metadata} from './metadata';
@@ -21,7 +24,7 @@ export class Client {
     }
     // TODO(murgatroid99): Figure out how to get version number
     // options['grpc.primary_user_agent'] += 'grpc-node/' + version;
-    this.channel = new Channel(address, credentials, options);
+    this.channel = new Http2Channel(new URL(address), credentials, options);
   }
 
   close(): void {
@@ -29,8 +32,27 @@ export class Client {
   }
 
   waitForReady(deadline: Date|number, callback: (error: Error|null) => void):
-      void {
-    throw new Error('waitForReady is not yet implemented');
+  void {
+    let cb : (error: Error|null) => void = once(callback);
+    let callbackCalled: boolean = false;
+    this.channel.connect(() => {
+      cb(null);
+    });
+    if (deadline != Infinity) {
+      let timeout: number;
+      let now: number = (new Date).getTime();
+      if (deadline instanceof Date) {
+        timeout = deadline.getTime() - now;
+      } else {
+        timeout = deadline - now;
+      }
+      if (timeout < 0) {
+        timeout = 0;
+      }
+      setTimeout(() => {
+        cb(new Error('Failed to connect before the deadline'));
+      }, timeout);
+    }
   }
 
   private handleUnaryResponse<ResponseType>(
diff --git a/src/compression-filter.ts b/src/compression-filter.ts
index 83d31692..b010f4c5 100644
--- a/src/compression-filter.ts
+++ b/src/compression-filter.ts
@@ -1,8 +1,12 @@
-import {Filter, BaseFilter} from './filter'
+import {CallStream} from './call-stream'
+import {Channel} from './channel'
+import {Filter, BaseFilter, FilterFactory} from './filter'
 import {Metadata} from './metadata'
 
 export class CompressionFilter extends BaseFilter implements Filter {
-  constructor() {}
+  constructor() {
+    super();
+  }
 
   async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
     let headers: Metadata = await metadata;
@@ -11,7 +15,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
     return headers;
   }
 
-  async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata {
+  async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
     let headers: Metadata = await metadata;
     headers.remove('grpc-encoding');
     headers.remove('grpc-accept-encoding');
@@ -19,8 +23,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
   }
 }
 
-export class CompressionFilterFactory<CompressionFilter> {
-  constructor(channel) {}
+export class CompressionFilterFactory implements FilterFactory<CompressionFilter> {
+  constructor(channel: Channel) {}
   createFilter(callStream: CallStream): CompressionFilter {
     return new CompressionFilter();
   }
diff --git a/src/deadline-filter.ts b/src/deadline-filter.ts
index b0dc378e..5796162e 100644
--- a/src/deadline-filter.ts
+++ b/src/deadline-filter.ts
@@ -1,7 +1,10 @@
-import {Filter} from './filter'
+import {CallStream} from './call-stream'
+import {Channel, Http2Channel} from './channel'
+import {Filter, BaseFilter, FilterFactory} from './filter'
 import {Status} from './constants'
+import {Metadata} from './metadata'
 
-const units = [
+const units: [string, number][] = [
   ['m', 1],
   ['S', 1000],
   ['M', 60 * 1000],
@@ -9,16 +12,21 @@ const units = [
 ]
 
 export class DeadlineFilter extends BaseFilter implements Filter {
-  private deadline;
-  constructor(private readonly channel: Channel, private readonly callStream: CallStream) {
-    let deadline = callStream.deadline;
-    this.deadline = deadline;
+  private deadline: number;
+  constructor(private readonly channel: Http2Channel, private readonly callStream: CallStream) {
+    super();
+    let callDeadline = callStream.getDeadline();
+    if (callDeadline instanceof Date) {
+      this.deadline = callDeadline.getTime();
+    } else {
+      this.deadline = callDeadline;
+    }
     let now: number = (new Date()).getTime();
-    let timeout = deadline - now;
+    let timeout = this.deadline - now;
     if (timeout < 0) {
       timeout = 0;
     }
-    if (deadline !== Infinity) {
+    if (this.deadline !== Infinity) {
       setTimeout(() => {
         callStream.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
       }, timeout);
@@ -42,7 +50,9 @@ export class DeadlineFilter extends BaseFilter implements Filter {
         }
       });
     });
-    (await metadata).set('grpc-timeout', await timeoutString);
+    let finalMetadata = await metadata;
+    finalMetadata.set('grpc-timeout', await timeoutString);
+    return finalMetadata;
   }
 }
 
diff --git a/src/filter-stack.ts b/src/filter-stack.ts
index 28e7dd99..2a402c3d 100644
--- a/src/filter-stack.ts
+++ b/src/filter-stack.ts
@@ -1,26 +1,28 @@
-import {flow, map} from 'lodash';
-import {Filter} from './filter'
+import {flow, flowRight, map} from 'lodash';
+import {Metadata} from './metadata';
+import {CallStream, StatusObject} from './call-stream'
+import {Filter, FilterFactory} from './filter';
 
 export class FilterStack implements Filter {
   constructor(private readonly filters: Filter[]) {}
 
   async sendMetadata(metadata: Promise<Metadata>) {
-    return await flow(map(filters, (filter) => filter.sendMetadata.bind(filter)))(metadata);
+    return await flow(map(this.filters, (filter) => filter.sendMetadata.bind(filter)))(metadata);
   }
 
   async receiveMetadata(metadata: Promise<Metadata>) {
-    return await flowRight(map(filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata);
+    return await flowRight(map(this.filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata);
   }
 
   async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
-    return await flowRight(map(filters, (filter) => filter.receiveTrailers.bind(filter)))(status);
+    return await flowRight(map(this.filters, (filter) => filter.receiveTrailers.bind(filter)))(status);
   }
 }
 
 export class FilterStackFactory implements FilterFactory<FilterStack> {
-  constructor(private readonly factories: FilterFactory[]) {}
+  constructor(private readonly factories: FilterFactory<any>[]) {}
 
   createFilter(callStream: CallStream): FilterStack {
-    return new FilterStack(map(factories, (factory) => factory.createFilter(callStream)));
+    return new FilterStack(map(this.factories, (factory) => factory.createFilter(callStream)));
   }
 }
diff --git a/src/filter.ts b/src/filter.ts
index 7d6a7a29..058c45c2 100644
--- a/src/filter.ts
+++ b/src/filter.ts
@@ -1,12 +1,12 @@
 import {Metadata} from './metadata'
-import {WriteObject, CallStream} from './call-stream'
+import {StatusObject, CallStream} from './call-stream'
 
 export interface Filter {
-  async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
+  sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
 
-  async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
+  receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
 
-  async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject>;
+  receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject>;
 }
 
 export abstract class BaseFilter {
diff --git a/src/metadata.ts b/src/metadata.ts
index e9e71208..979d1230 100644
--- a/src/metadata.ts
+++ b/src/metadata.ts
@@ -65,7 +65,8 @@ function validate(key: string, value?: MetadataValue): void {
  * A class for storing metadata. Keys are normalized to lowercase ASCII.
  */
 export class Metadata {
-  constructor(protected readonly internalRepr: MetadataObject = {}) {}
+  private internalRepr: MetadataObject;
+  constructor() {}
 
   /**
    * Sets the given value for the given key by replacing any other values
@@ -145,7 +146,9 @@ export class Metadata {
    * @return The newly cloned object.
    */
   clone(): Metadata {
-    return new Metadata(cloneMetadataObject(this.internalRepr));
+    let newMetadata = new Metadata();
+    newMetadata.internalRepr = cloneMetadataObject(this.internalRepr);
+    return newMetadata;
   }
 
   /**