Finished initial Channel and CallStream implementations

This commit is contained in:
murgatroid99 2017-08-24 18:01:40 -07:00
parent a6e3597697
commit 4328808943
12 changed files with 310 additions and 158 deletions

View File

@ -14,24 +14,23 @@
}, },
"license": "Apache-2.0", "license": "Apache-2.0",
"devDependencies": { "devDependencies": {
"@types/mocha": "^2.2.41", "@types/mocha": "^2.2.42",
"@types/node": "^8.0.19", "@types/node": "^8.0.25",
"clang-format": "^1.0.53", "clang-format": "^1.0.53",
"del": "^3.0.0", "del": "^3.0.0",
"google-ts-style": "latest", "google-ts-style": "^0.2.0",
"gulp": "^3.9.1", "gulp": "^3.9.1",
"gulp-help": "^1.6.1", "gulp-help": "^1.6.1",
"gulp-mocha": "^4.3.1", "gulp-mocha": "^4.3.1",
"gulp-sourcemaps": "^2.6.0", "gulp-sourcemaps": "^2.6.1",
"gulp-tslint": "^8.1.1", "gulp-tslint": "^8.1.1",
"gulp-typescript": "^3.2.1", "gulp-typescript": "^3.2.2",
"gulp-util": "^3.0.8", "gulp-util": "^3.0.8",
"h2-types": "git+https://github.com/kjin/node-h2-types.git",
"merge2": "^1.1.0", "merge2": "^1.1.0",
"mocha": "^3.5.0", "mocha": "^3.5.0",
"through2": "^2.0.3", "through2": "^2.0.3",
"tslint": "^5.5.0", "tslint": "^5.5.0",
"typescript": "^2.4.1" "typescript": "^2.5.1"
}, },
"contributors": [ "contributors": [
{ {
@ -48,9 +47,7 @@
"test": "gulp test" "test": "gulp test"
}, },
"dependencies": { "dependencies": {
"@types/async": "^2.0.41",
"@types/lodash": "^4.14.73", "@types/lodash": "^4.14.73",
"async": "^2.5.0",
"lodash": "^4.17.4" "lodash": "^4.17.4"
} }
} }

View File

@ -1,28 +1,32 @@
import {promisify} from 'util' import {promisify} from 'util'
import {Filter} from './filter' import {Filter, BaseFilter, FilterFactory} from './filter'
import {CallCredentials} from './call-credentials' import {CallCredentials} from './call-credentials'
import {Http2Channel} from './channel'
import {CallStream} from './call-stream'
import {Metadata} from './metadata'
export class CallCredentialsFilter extends BaseFilter implements Filter { export class CallCredentialsFilter extends BaseFilter implements Filter {
private credsMetadata: Promise<Metadata>; constructor(private readonly credentials: CallCredentials) {
super();
constructor(credentials: CallCredentials) {
// TODO(murgatroid99): pass real options to generateMetadata
credsMetadata = util.promisify(credentials.generateMetadata.bind(credentials))({});
} }
async sendMetadata(metadata: Promise<Metadata>) { async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
return (await metadata).merge(await this.credsMetadata); // TODO(murgatroid99): pass real options to generateMetadata
let credsMetadata = this.credentials.generateMetadata.bind({});
let resultMetadata = await metadata;
resultMetadata.merge(await credsMetadata);
return resultMetadata;
} }
} }
export class CallCredentialsFilterFactory implements FilterFactory<CallCredentialsFilter> { export class CallCredentialsFilterFactory implements FilterFactory<CallCredentialsFilter> {
private credentials: CallCredentials | null; private readonly credentials: CallCredentials;
constructor(channel: Http2Channel) { constructor(channel: Http2Channel) {
this.credentials = channel.credentials.getCallCredentials(); this.credentials = channel.credentials.getCallCredentials();
} }
createFilter(callStream: CallStream): CallCredentialsFilter { createFilter(callStream: CallStream): CallCredentialsFilter {
return new CallCredentialsFilter(this.credentials.compose(callStream.credentials)); return new CallCredentialsFilter(this.credentials.compose(callStream.getCredentials()));
} }
} }

View File

@ -1,10 +1,10 @@
import { Metadata } from './metadata'; import { Metadata } from './metadata';
import * as async from 'async'; import {map, reduce} from 'lodash'
export type CallMetadataGenerator = ( export type CallMetadataGenerator = (
options: Object, options: Object,
cb: (err: Error | null, metadata?: Metadata) => void cb: (err: Error | null, metadata?: Metadata) => void
) => void ) => void;
/** /**
* A class that represents a generic method of adding authentication-related * A class that represents a generic method of adding authentication-related
@ -14,17 +14,15 @@ export interface CallCredentials {
/** /**
* Asynchronously generates a new Metadata object. * Asynchronously generates a new Metadata object.
* @param options Options used in generating the Metadata object. * @param options Options used in generating the Metadata object.
* @param cb A callback of the form (err, metadata) which will be called with
* either the generated metadata, or an error if one occurred.
*/ */
generateMetadata: CallMetadataGenerator; generateMetadata(options: Object): Promise<Metadata>;
/** /**
* Creates a new CallCredentials object from properties of both this and * Creates a new CallCredentials object from properties of both this and
* another CallCredentials object. This object's metadata generator will be * another CallCredentials object. This object's metadata generator will be
* called first. * called first.
* @param callCredentials The other CallCredentials object. * @param callCredentials The other CallCredentials object.
*/ */
compose: (callCredentials: CallCredentials) => CallCredentials; compose(callCredentials: CallCredentials): CallCredentials;
} }
export namespace CallCredentials { export namespace CallCredentials {
@ -38,48 +36,60 @@ export namespace CallCredentials {
export function createFromMetadataGenerator( export function createFromMetadataGenerator(
metadataGenerator: CallMetadataGenerator metadataGenerator: CallMetadataGenerator
): CallCredentials { ): CallCredentials {
return new CallCredentialsImpl([metadataGenerator]); return new SingleCallCredentials(metadataGenerator);
}
export function createEmpty(): CallCredentials {
return new EmptyCallCredentials();
} }
} }
class ComposedCallCredentials implements CallCredentials {
constructor(private creds: CallCredentials[]) {}
class CallCredentialsImpl { async generateMetadata(options: Object): Promise<Metadata> {
constructor(private metadataGenerators: Array<CallMetadataGenerator>) {} let base: Metadata = new Metadata();
let generated: Metadata[] = await Promise.all(map(
generateMetadata( this.creds, (cred) => cred.generateMetadata(options)));
options: Object, for (let gen of generated) {
cb: (err: Error | null, metadata?: Metadata) => void base.merge(gen);
): void {
if (this.metadataGenerators.length === 1) {
this.metadataGenerators[0](options, cb);
return;
} }
return base;
}
const tasks: Array<AsyncFunction<Metadata, Error>> = compose(other: CallCredentials): CallCredentials {
this.metadataGenerators.map(fn => fn.bind(null, options)); return new ComposedCallCredentials(this.creds.concat([other]));
const callback: AsyncResultArrayCallback<Metadata, Error> = }
(err, metadataArray) => { }
if (err || !metadataArray) {
cb(err || new Error('Unknown error')); class SingleCallCredentials implements CallCredentials{
return; constructor(private metadataGenerator: CallMetadataGenerator) {}
async generateMetadata(options: Object): Promise<Metadata> {
return new Promise<Metadata>((resolve, reject) => {
this.metadataGenerator(options, (err, metadata) => {
if (metadata !== undefined) {
resolve(metadata);
} else { } else {
const result: Metadata = new Metadata(); reject(err);
metadataArray.forEach((metadata) => {
if (metadata) {
result.merge(metadata);
}
});
cb(null, result);
} }
}; });
async.parallel(tasks, callback); });
} }
compose(callCredentials: CallCredentials): CallCredentials { compose(other: CallCredentials): CallCredentials {
if (!(callCredentials instanceof CallCredentialsImpl)) { return new ComposedCallCredentials([this, other]);
throw new Error('Unknown CallCredentials implementation provided'); }
} }
return new CallCredentialsImpl(this.metadataGenerators.concat(
(callCredentials as CallCredentialsImpl).metadataGenerators)); class EmptyCallCredentials implements CallCredentials {
constructor () {}
async generateMetadata(options: Object): Promise<Metadata> {
return new Metadata();
}
compose(other:CallCredentials): CallCredentials {
return other;
} }
} }

View File

@ -1,5 +1,7 @@
import * as stream from 'stream'; import * as stream from 'stream';
import * as http2 from 'http2';
import {CallCredentials} from './call-credentials'; import {CallCredentials} from './call-credentials';
import {Status} from './constants'; import {Status} from './constants';
import {Metadata} from './metadata'; import {Metadata} from './metadata';
@ -7,13 +9,21 @@ import {ObjectDuplex} from './object-stream';
import {Filter} from './filter' import {Filter} from './filter'
import {FilterStackFactory} from './filter-stack' import {FilterStackFactory} from './filter-stack'
export interface CallOptions { const {
deadline?: Date|number; HTTP2_HEADER_STATUS,
host?: string; HTTP2_HEADER_CONTENT_TYPE
credentials?: CallCredentials; } = http2.constants;
flags?: number;
export type Deadline = Date | number;
export interface CallStreamOptions {
deadline: Deadline;
credentials: CallCredentials;
flags: number;
} }
export type CallOptions = Partial<CallStreamOptions>;
export interface StatusObject { export interface StatusObject {
code: Status; code: Status;
details: string; details: string;
@ -32,6 +42,12 @@ export interface CallStream extends ObjectDuplex<WriteObject, Buffer> {
cancelWithStatus(status: Status, details: string): void; cancelWithStatus(status: Status, details: string): void;
getPeer(): string; getPeer(): string;
getDeadline(): Deadline;
getCredentials(): CallCredentials;
/* If the return value is null, the call has not ended yet. Otherwise, it has
* ended with the specified status */
getStatus(): StatusObject | null;
addListener(event: string, listener: Function): this; addListener(event: string, listener: Function): this;
emit(event: string|symbol, ...args: any[]): boolean; emit(event: string|symbol, ...args: any[]): boolean;
on(event: string, listener: Function): this; on(event: string, listener: Function): this;
@ -71,16 +87,16 @@ enum ReadState {
export class Http2CallStream extends stream.Duplex implements CallStream { export class Http2CallStream extends stream.Duplex implements CallStream {
private filterStack: Filter; public filterStack: Filter;
private statusEmitted: bool = false; private statusEmitted: boolean = false;
private http2Stream: ClientHttp2Stream | null = null; private http2Stream: http2.ClientHttp2Stream | null = null;
private pendingRead: bool = false; private pendingRead: boolean = false;
private pendingWrite: Buffer | null = null; private pendingWrite: Buffer | null = null;
private pendingWriteCallback: Function | null = null; private pendingWriteCallback: Function | null = null;
private pendingFinalCallback: Function | null = null; private pendingFinalCallback: Function | null = null;
private readState: ReadState = ReadState.NO_DATA; private readState: ReadState = ReadState.NO_DATA;
private readCompressFlag: bool = false; private readCompressFlag: boolean = false;
private readPartialSize: Buffer = Buffer.alloc(4); private readPartialSize: Buffer = Buffer.alloc(4);
private readSizeRemaining: number = 4; private readSizeRemaining: number = 4;
private readMessageSize: number = 0; private readMessageSize: number = 0;
@ -92,21 +108,26 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
// Status code mapped from :status. To be used if grpc-status is not received // Status code mapped from :status. To be used if grpc-status is not received
private mappedStatusCode: Status = Status.UNKNOWN; private mappedStatusCode: Status = Status.UNKNOWN;
constructor(public readonly methodName: string, public readonly options: CallOptions, // This is populated (non-null) if and only if the call has ended
private finalStatus: StatusObject | null = null;
constructor(private readonly methodName: string,
private readonly options: CallStreamOptions,
filterStackFactory: FilterStackFactory) { filterStackFactory: FilterStackFactory) {
this.filterStack = FilterStackFactory.createFilter(this); super({objectMode: true});
this.filterStack = filterStackFactory.createFilter(this);
} }
private endCall(status: StatusObject): void { private endCall(status: StatusObject): void {
if (!this.statusEmitted) { if (!this.finalStatus === null) {
this.emit('status', {code: status, details: details, metadata: new Metadata()}); this.finalStatus = status;
this.statusEmitted = true; this.emit('status', status);
} }
} }
attachHttp2Stream(stream: ClientHttp2Stream): void { attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
if (this.statusEmitted) { if (this.finalStatus !== null) {
// TODO(murgatroid99): Handle call end before http2 stream start stream.rstWithCancel();
} else { } else {
this.http2Stream = stream; this.http2Stream = stream;
stream.on('response', (headers) => { stream.on('response', (headers) => {
@ -133,8 +154,8 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
default: default:
this.mappedStatusCode = Status.UNKNOWN; this.mappedStatusCode = Status.UNKNOWN;
} }
delete headers[HTTP2_HEADERS_STATUS]; delete headers[HTTP2_HEADER_STATUS];
delete headers[HTTP2_HEADERS_CONTENT_TYPE]; delete headers[HTTP2_HEADER_CONTENT_TYPE];
let metadata: Metadata; let metadata: Metadata;
try { try {
metadata = Metadata.fromHttp2Headers(headers); metadata = Metadata.fromHttp2Headers(headers);
@ -152,7 +173,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
let code: Status = this.mappedStatusCode; let code: Status = this.mappedStatusCode;
if (headers.hasOwnProperty('grpc-status')) { if (headers.hasOwnProperty('grpc-status')) {
let receivedCode = Number(headers['grpc-status']); let receivedCode = Number(headers['grpc-status']);
if (possibleCode in Status) { if (receivedCode in Status) {
code = receivedCode; code = receivedCode;
} else { } else {
code = Status.UNKNOWN; code = Status.UNKNOWN;
@ -169,23 +190,29 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
} catch (e) { } catch (e) {
metadata = new Metadata(); metadata = new Metadata();
} }
let status: StatusObject = {
code: code,
details: details,
metadata: metadata
};
this.filterStack.receiveTrailers(Promise.resolve(status)).then((finalStatus) => { this.filterStack.receiveTrailers(Promise.resolve(status)).then((finalStatus) => {
this.endCall(finalStatus); this.endCall(finalStatus);
}, (error) => { }, (error) => {
this.endCall({ this.endCall({
code: Status.INTERNAL, code: Status.INTERNAL,
details: 'Failed to process received status', details: 'Failed to process received status',
metadata: new Metadata(); metadata: new Metadata()
}); });
}); });
}); });
stream.on('read', (data) => { stream.on('read', (data) => {
let readHead = 0; let readHead = 0;
let canPush = true; let canPush = true;
let toRead: number;
while (readHead < data.length) { while (readHead < data.length) {
switch(this.readState) { switch(this.readState) {
case ReadState.NO_DATA: case ReadState.NO_DATA:
readCompressFlag = (data.readUInt8(readHead) !== 0); this.readCompressFlag = (data.readUInt8(readHead) !== 0);
this.readState = ReadState.READING_SIZE; this.readState = ReadState.READING_SIZE;
this.readPartialSize.fill(0); this.readPartialSize.fill(0);
this.readSizeRemaining = 4; this.readSizeRemaining = 4;
@ -194,31 +221,31 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
this.readPartialMessage = []; this.readPartialMessage = [];
break; break;
case ReadState.READING_SIZE: case ReadState.READING_SIZE:
let toRead: number = Math.min(data.length - readHead, this.readSizeRemaining); toRead = Math.min(data.length - readHead, this.readSizeRemaining);
data.copy(readPartialSize, 4 - this.readSizeRemaining, readHead, readHead + toRead); data.copy(this.readPartialSize, 4 - this.readSizeRemaining, readHead, readHead + toRead);
this.readSizeRemaining -= toRead; this.readSizeRemaining -= toRead;
readHead += toRead; readHead += toRead;
// readSizeRemaining >=0 here // readSizeRemaining >=0 here
if (this.readSizeRemaining === 0) { if (this.readSizeRemaining === 0) {
this.readMessageSize = readPartialSize.readUInt32BE(0); this.readMessageSize = this.readPartialSize.readUInt32BE(0);
this.readMessageRemaining = this.readMessageSize; this.readMessageRemaining = this.readMessageSize;
this.readState = ReadState.READING_MESSAGE; this.readState = ReadState.READING_MESSAGE;
} }
break; break;
case ReadSize.READING_MESSAGE: case ReadState.READING_MESSAGE:
let toRead: number = math.min(data.length - readHead, this.readMessageRemaining); toRead = Math.min(data.length - readHead, this.readMessageRemaining);
readPartialMessage.push(data.slice(readHead, readHead + toRead)); this.readPartialMessage.push(data.slice(readHead, readHead + toRead));
this.readMessageRemaining -= toRead; this.readMessageRemaining -= toRead;
this.readHead += toRead; readHead += toRead;
// readMessageRemaining >=0 here // readMessageRemaining >=0 here
if (this.readMessageRemaining === 0) { if (this.readMessageRemaining === 0) {
// At this point, we have read a full message // At this point, we have read a full message
let messageBytes = Buffer.concat(readPartialMessage, readMessageSize); let messageBytes = Buffer.concat(this.readPartialMessage, this.readMessageSize);
// TODO(murgatroid99): Add receive message filters // TODO(murgatroid99): Add receive message filters
if (canPush) { if (canPush) {
if (!this.push(messageBytes)) { if (!this.push(messageBytes)) {
canPush = false; canPush = false;
this.http2Stream.pause(); (this.http2Stream as http2.ClientHttp2Stream).pause();
} }
} else { } else {
this.unpushedReadMessages.push(messageBytes); this.unpushedReadMessages.push(messageBytes);
@ -234,6 +261,40 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
this.unpushedReadMessages.push(null); this.unpushedReadMessages.push(null);
} }
}); });
stream.on('streamClosed', (errorCode) => {
let code: Status;
let details: string = '';
switch(errorCode) {
case http2.constants.NGHTTP2_REFUSED_STREAM:
code = Status.UNAVAILABLE;
break;
case http2.constants.NGHTTP2_CANCEL:
code = Status.CANCELLED;
break;
case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
code = Status.RESOURCE_EXHAUSTED;
details = 'Bandwidth exhausted';
break;
case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
code = Status.PERMISSION_DENIED;
details = 'Protocol not secure enough';
break;
default:
code = Status.INTERNAL;
}
this.endCall({
code: code,
details: details,
metadata: new Metadata()
});
});
stream.on('error', () => {
this.endCall({
code: Status.INTERNAL,
details: 'Internal HTTP2 error',
metadata: new Metadata()
});
});
} }
} }
@ -246,6 +307,18 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
} }
} }
getDeadline(): Deadline {
return this.options.deadline;
}
getCredentials(): CallCredentials {
return this.options.credentials;
}
getStatus(): StatusObject | null {
return this.finalStatus;
}
getPeer(): string { getPeer(): string {
throw new Error('Not yet implemented'); throw new Error('Not yet implemented');
} }
@ -254,8 +327,8 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
if (this.http2Stream === null) { if (this.http2Stream === null) {
this.pendingRead = true; this.pendingRead = true;
} else { } else {
while (unpushedReadMessages.length > 0) { while (this.unpushedReadMessages.length > 0) {
let nextMessage = unpushedReadMessages.shift(); let nextMessage = this.unpushedReadMessages.shift();
let keepPushing = this.push(nextMessage); let keepPushing = this.push(nextMessage);
if (nextMessage === null || (!keepPushing)) { if (nextMessage === null || (!keepPushing)) {
return; return;
@ -270,19 +343,19 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
// Encode a message to the wire format // Encode a message to the wire format
private encodeMessage(message: WriteObject): Buffer { private encodeMessage(message: WriteObject): Buffer {
/* unsafeAlloc doesn't initiate the bytes in the buffer. We are explicitly /* allocUnsafe doesn't initiate the bytes in the buffer. We are explicitly
* overwriting every single byte, so that should be fine */ * overwriting every single byte, so that should be fine */
let output: Buffer = Buffer.unsafeAlloc(message.length + 5); let output: Buffer = Buffer.allocUnsafe(message.message.length + 5);
// TODO(murgatroid99): handle compressed flag appropriately // TODO(murgatroid99): handle compressed flag appropriately
output.writeUInt8(0, 0); output.writeUInt8(0, 0);
output.writeUint32BE(message.message.length, 1); output.writeUInt32BE(message.message.length, 1);
message.message.copy(output, 5); message.message.copy(output, 5);
return output; return output;
} }
_write(chunk: WriteObject, encoding: string, cb: Function) { _write(chunk: WriteObject, encoding: string, cb: Function) {
// TODO(murgatroid99): Add send message filters // TODO(murgatroid99): Add send message filters
let encodedMessage = encodeMessage(chunk); let encodedMessage = this.encodeMessage(chunk);
if (this.http2Stream === null) { if (this.http2Stream === null) {
this.pendingWrite = encodedMessage; this.pendingWrite = encodedMessage;
this.pendingWriteCallback = cb; this.pendingWriteCallback = cb;

View File

@ -18,8 +18,8 @@ export interface ChannelCredentials {
/** /**
* Gets the set of per-call credentials associated with this instance. * Gets the set of per-call credentials associated with this instance.
*/ */
getCallCredentials() : CallCredentials | null; getCallCredentials() : CallCredentials;
/** /**
* Gets a SecureContext object generated from input parameters if this * Gets a SecureContext object generated from input parameters if this
* instance was created with createSsl, or null if this instance was created * instance was created with createSsl, or null if this instance was created
@ -62,15 +62,15 @@ export namespace ChannelCredentials {
abstract class ChannelCredentialsImpl implements ChannelCredentials { abstract class ChannelCredentialsImpl implements ChannelCredentials {
protected callCredentials: CallCredentials | null; protected callCredentials: CallCredentials;
protected constructor(callCredentials?: CallCredentials) { protected constructor(callCredentials?: CallCredentials) {
this.callCredentials = callCredentials || null; this.callCredentials = callCredentials || CallCredentials.createEmpty();
} }
abstract compose(callCredentials: CallCredentials) : ChannelCredentialsImpl; abstract compose(callCredentials: CallCredentials) : ChannelCredentialsImpl;
getCallCredentials() : CallCredentials | null { getCallCredentials() : CallCredentials {
return this.callCredentials; return this.callCredentials;
} }
@ -83,10 +83,7 @@ class InsecureChannelCredentialsImpl extends ChannelCredentialsImpl {
} }
compose(callCredentials: CallCredentials) : ChannelCredentialsImpl { compose(callCredentials: CallCredentials) : ChannelCredentialsImpl {
const combinedCallCredentials = this.callCredentials ? throw new Error("Cannot compose insecure credentials");
this.callCredentials.compose(callCredentials) :
callCredentials;
return new InsecureChannelCredentialsImpl(combinedCallCredentials);
} }
getSecureContext() : SecureContext | null { getSecureContext() : SecureContext | null {
@ -106,9 +103,8 @@ class SecureChannelCredentialsImpl extends ChannelCredentialsImpl {
} }
compose(callCredentials: CallCredentials) : ChannelCredentialsImpl { compose(callCredentials: CallCredentials) : ChannelCredentialsImpl {
const combinedCallCredentials = this.callCredentials ? const combinedCallCredentials =
this.callCredentials.compose(callCredentials) : this.callCredentials.compose(callCredentials);
callCredentials;
return new SecureChannelCredentialsImpl(this.secureContext, return new SecureChannelCredentialsImpl(this.secureContext,
combinedCallCredentials); combinedCallCredentials);
} }

View File

@ -1,9 +1,9 @@
import {EventEmitter} from 'events'; import {EventEmitter} from 'events';
import {SecureContext} from 'tls'; import {SecureContext} from 'tls';
import * as http2 from 'http2'; import * as http2 from 'http2';
import {IncomingHttpHeaders, OutgoingHttpHeaders} from 'http';
import * as url from 'url'; import * as url from 'url';
import {CallOptions, CallStream} from './call-stream'; import {CallOptions, CallStreamOptions, CallStream, Http2CallStream} from './call-stream';
import {CallCredentials} from './call-credentials';
import {ChannelCredentials} from './channel-credentials'; import {ChannelCredentials} from './channel-credentials';
import {Metadata, MetadataObject} from './metadata'; import {Metadata, MetadataObject} from './metadata';
import {Status} from './constants' import {Status} from './constants'
@ -11,12 +11,13 @@ import {Status} from './constants'
import {FilterStackFactory} from './filter-stack' import {FilterStackFactory} from './filter-stack'
import {DeadlineFilterFactory} from './deadline-filter' import {DeadlineFilterFactory} from './deadline-filter'
import {CallCredentialsFilterFactory} from './call-credentials-filter' import {CallCredentialsFilterFactory} from './call-credentials-filter'
import {Http2FilterFactory} from './http2-filter' import {CompressionFilterFactory} from './compression-filter'
const IDLE_TIMEOUT_MS = 300000; const IDLE_TIMEOUT_MS = 300000;
const { const {
HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_AUTHORITY,
HTTP2_HEADER_CONTENT_TYPE,
HTTP2_HEADER_METHOD, HTTP2_HEADER_METHOD,
HTTP2_HEADER_PATH, HTTP2_HEADER_PATH,
HTTP2_HEADER_SCHEME, HTTP2_HEADER_SCHEME,
@ -44,8 +45,8 @@ export enum ConnectivityState {
* by a given address. * by a given address.
*/ */
export interface Channel extends EventEmitter { export interface Channel extends EventEmitter {
createStream(methodName: string, metadata: OutgoingHttp2Headers, options: CallOptions): CallStream; createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream;
connect(() => void): void; connect(callback: () => void): void;
getConnectivityState(): ConnectivityState; getConnectivityState(): ConnectivityState;
close(): void; close(): void;
@ -63,8 +64,7 @@ export class Http2Channel extends EventEmitter implements Channel {
private idleTimerId: NodeJS.Timer | null = null; private idleTimerId: NodeJS.Timer | null = null;
/* For now, we have up to one subchannel, which will exist as long as we are /* For now, we have up to one subchannel, which will exist as long as we are
* connecting or trying to connect */ * connecting or trying to connect */
private subChannel : Http2Session | null; private subChannel : http2.ClientHttp2Session | null;
private address : url.Url;
private filterStackFactory : FilterStackFactory; private filterStackFactory : FilterStackFactory;
private transitionToState(newState: ConnectivityState): void { private transitionToState(newState: ConnectivityState): void {
@ -76,7 +76,12 @@ export class Http2Channel extends EventEmitter implements Channel {
private startConnecting(): void { private startConnecting(): void {
this.transitionToState(ConnectivityState.CONNECTING); this.transitionToState(ConnectivityState.CONNECTING);
this.subChannel = http2.connect(address, { secureContext: this.secureContext }); let secureContext = this.credentials.getSecureContext();
if (secureContext === null) {
this.subChannel = http2.connect(this.address);
} else {
this.subChannel = http2.connect(this.address, {secureContext});
}
this.subChannel.on('connect', () => { this.subChannel.on('connect', () => {
this.transitionToState(ConnectivityState.READY); this.transitionToState(ConnectivityState.READY);
}); });
@ -86,7 +91,10 @@ export class Http2Channel extends EventEmitter implements Channel {
} }
private goIdle(): void { private goIdle(): void {
this.subChannel.shutdown({graceful: true}); if (this.subChannel !== null) {
this.subChannel.shutdown({graceful: true}, () => {});
this.subChannel = null;
}
this.transitionToState(ConnectivityState.IDLE); this.transitionToState(ConnectivityState.IDLE);
} }
@ -96,10 +104,11 @@ export class Http2Channel extends EventEmitter implements Channel {
} }
} }
constructor(private readonly address: url.Url, constructor(private readonly address: url.URL,
public readonly credentials: ChannelCredentials, public readonly credentials: ChannelCredentials,
private readonly options: ChannelOptions) { private readonly options: ChannelOptions) {
if (channelCredentials.getSecureContext() === null) { super();
if (credentials.getSecureContext() === null) {
address.protocol = 'http'; address.protocol = 'http';
} else { } else {
address.protocol = 'https'; address.protocol = 'https';
@ -111,11 +120,7 @@ export class Http2Channel extends EventEmitter implements Channel {
]); ]);
} }
createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream { private startHttp2Stream(methodName: string, stream: Http2CallStream, metadata: Metadata) {
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down');
}
let stream: Http2CallStream = new Http2CallStream(methodName, options, this.filterStackFactory);
let finalMetadata: Promise<Metadata> = stream.filterStack.sendMetadata(Promise.resolve(metadata)); let finalMetadata: Promise<Metadata> = stream.filterStack.sendMetadata(Promise.resolve(metadata));
this.connect(() => { this.connect(() => {
finalMetadata.then((metadataValue) => { finalMetadata.then((metadataValue) => {
@ -125,13 +130,37 @@ export class Http2Channel extends EventEmitter implements Channel {
headers[HTTP2_HEADER_METHOD] = 'POST'; headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = methodName; headers[HTTP2_HEADER_PATH] = methodName;
headers[HTTP2_HEADER_TE] = 'trailers'; headers[HTTP2_HEADER_TE] = 'trailers';
if (stream.isOpen()) { if (stream.getStatus() === null) {
stream.attachHttp2Stream(this.subchannel.request(headers)); if (this.connectivityState === ConnectivityState.READY) {
let session: http2.ClientHttp2Session =
(this.subChannel as http2.ClientHttp2Session);
stream.attachHttp2Stream(session.request(headers));
} else {
/* In this case, we lost the connection while finalizing metadata.
* That should be very unusual */
setImmediate(() => {
this.startHttp2Stream(methodName, stream, metadata);
});
}
} }
}, (error) => { }, (error) => {
stream.cancelWithStatus(Status.UNKNOWN, "Failed to generate metadata"); stream.cancelWithStatus(Status.UNKNOWN, "Failed to generate metadata");
}); });
}); });
}
createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream {
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down');
}
let finalOptions: CallStreamOptions = {
deadline: options.deadline === undefined ? Infinity : options.deadline,
credentials: options.credentials === undefined ?
CallCredentials.createEmpty() : options.credentials,
flags: options.flags === undefined ? 0 : options.flags
}
let stream: Http2CallStream = new Http2CallStream(methodName, finalOptions, this.filterStackFactory);
this.startHttp2Stream(methodName, stream, metadata);
return stream; return stream;
} }
@ -157,6 +186,8 @@ export class Http2Channel extends EventEmitter implements Channel {
throw new Error('Channel has been shut down'); throw new Error('Channel has been shut down');
} }
this.transitionToState(ConnectivityState.SHUTDOWN); this.transitionToState(ConnectivityState.SHUTDOWN);
this.subChannel.shutdown({graceful: true}); if (this.subChannel !== null) {
this.subChannel.shutdown({graceful: true});
}
} }
} }

View File

@ -1,6 +1,9 @@
import {once} from 'lodash';
import {URL} from 'url';
import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError, ServiceErrorImpl} from './call'; import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError, ServiceErrorImpl} from './call';
import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream'; import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream';
import {Channel, ChannelOptions} from './channel'; import {Channel, Http2Channel, ChannelOptions} from './channel';
import {ChannelCredentials} from './channel-credentials'; import {ChannelCredentials} from './channel-credentials';
import {Status} from './constants'; import {Status} from './constants';
import {Metadata} from './metadata'; import {Metadata} from './metadata';
@ -21,7 +24,7 @@ export class Client {
} }
// TODO(murgatroid99): Figure out how to get version number // TODO(murgatroid99): Figure out how to get version number
// options['grpc.primary_user_agent'] += 'grpc-node/' + version; // options['grpc.primary_user_agent'] += 'grpc-node/' + version;
this.channel = new Channel(address, credentials, options); this.channel = new Http2Channel(new URL(address), credentials, options);
} }
close(): void { close(): void {
@ -29,8 +32,27 @@ export class Client {
} }
waitForReady(deadline: Date|number, callback: (error: Error|null) => void): waitForReady(deadline: Date|number, callback: (error: Error|null) => void):
void { void {
throw new Error('waitForReady is not yet implemented'); let cb : (error: Error|null) => void = once(callback);
let callbackCalled: boolean = false;
this.channel.connect(() => {
cb(null);
});
if (deadline != Infinity) {
let timeout: number;
let now: number = (new Date).getTime();
if (deadline instanceof Date) {
timeout = deadline.getTime() - now;
} else {
timeout = deadline - now;
}
if (timeout < 0) {
timeout = 0;
}
setTimeout(() => {
cb(new Error('Failed to connect before the deadline'));
}, timeout);
}
} }
private handleUnaryResponse<ResponseType>( private handleUnaryResponse<ResponseType>(

View File

@ -1,8 +1,12 @@
import {Filter, BaseFilter} from './filter' import {CallStream} from './call-stream'
import {Channel} from './channel'
import {Filter, BaseFilter, FilterFactory} from './filter'
import {Metadata} from './metadata' import {Metadata} from './metadata'
export class CompressionFilter extends BaseFilter implements Filter { export class CompressionFilter extends BaseFilter implements Filter {
constructor() {} constructor() {
super();
}
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> { async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
let headers: Metadata = await metadata; let headers: Metadata = await metadata;
@ -11,7 +15,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
return headers; return headers;
} }
async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata { async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
let headers: Metadata = await metadata; let headers: Metadata = await metadata;
headers.remove('grpc-encoding'); headers.remove('grpc-encoding');
headers.remove('grpc-accept-encoding'); headers.remove('grpc-accept-encoding');
@ -19,8 +23,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
} }
} }
export class CompressionFilterFactory<CompressionFilter> { export class CompressionFilterFactory implements FilterFactory<CompressionFilter> {
constructor(channel) {} constructor(channel: Channel) {}
createFilter(callStream: CallStream): CompressionFilter { createFilter(callStream: CallStream): CompressionFilter {
return new CompressionFilter(); return new CompressionFilter();
} }

View File

@ -1,7 +1,10 @@
import {Filter} from './filter' import {CallStream} from './call-stream'
import {Channel, Http2Channel} from './channel'
import {Filter, BaseFilter, FilterFactory} from './filter'
import {Status} from './constants' import {Status} from './constants'
import {Metadata} from './metadata'
const units = [ const units: [string, number][] = [
['m', 1], ['m', 1],
['S', 1000], ['S', 1000],
['M', 60 * 1000], ['M', 60 * 1000],
@ -9,16 +12,21 @@ const units = [
] ]
export class DeadlineFilter extends BaseFilter implements Filter { export class DeadlineFilter extends BaseFilter implements Filter {
private deadline; private deadline: number;
constructor(private readonly channel: Channel, private readonly callStream: CallStream) { constructor(private readonly channel: Http2Channel, private readonly callStream: CallStream) {
let deadline = callStream.deadline; super();
this.deadline = deadline; let callDeadline = callStream.getDeadline();
if (callDeadline instanceof Date) {
this.deadline = callDeadline.getTime();
} else {
this.deadline = callDeadline;
}
let now: number = (new Date()).getTime(); let now: number = (new Date()).getTime();
let timeout = deadline - now; let timeout = this.deadline - now;
if (timeout < 0) { if (timeout < 0) {
timeout = 0; timeout = 0;
} }
if (deadline !== Infinity) { if (this.deadline !== Infinity) {
setTimeout(() => { setTimeout(() => {
callStream.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded'); callStream.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
}, timeout); }, timeout);
@ -42,7 +50,9 @@ export class DeadlineFilter extends BaseFilter implements Filter {
} }
}); });
}); });
(await metadata).set('grpc-timeout', await timeoutString); let finalMetadata = await metadata;
finalMetadata.set('grpc-timeout', await timeoutString);
return finalMetadata;
} }
} }

View File

@ -1,26 +1,28 @@
import {flow, map} from 'lodash'; import {flow, flowRight, map} from 'lodash';
import {Filter} from './filter' import {Metadata} from './metadata';
import {CallStream, StatusObject} from './call-stream'
import {Filter, FilterFactory} from './filter';
export class FilterStack implements Filter { export class FilterStack implements Filter {
constructor(private readonly filters: Filter[]) {} constructor(private readonly filters: Filter[]) {}
async sendMetadata(metadata: Promise<Metadata>) { async sendMetadata(metadata: Promise<Metadata>) {
return await flow(map(filters, (filter) => filter.sendMetadata.bind(filter)))(metadata); return await flow(map(this.filters, (filter) => filter.sendMetadata.bind(filter)))(metadata);
} }
async receiveMetadata(metadata: Promise<Metadata>) { async receiveMetadata(metadata: Promise<Metadata>) {
return await flowRight(map(filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata); return await flowRight(map(this.filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata);
} }
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> { async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
return await flowRight(map(filters, (filter) => filter.receiveTrailers.bind(filter)))(status); return await flowRight(map(this.filters, (filter) => filter.receiveTrailers.bind(filter)))(status);
} }
} }
export class FilterStackFactory implements FilterFactory<FilterStack> { export class FilterStackFactory implements FilterFactory<FilterStack> {
constructor(private readonly factories: FilterFactory[]) {} constructor(private readonly factories: FilterFactory<any>[]) {}
createFilter(callStream: CallStream): FilterStack { createFilter(callStream: CallStream): FilterStack {
return new FilterStack(map(factories, (factory) => factory.createFilter(callStream))); return new FilterStack(map(this.factories, (factory) => factory.createFilter(callStream)));
} }
} }

View File

@ -1,12 +1,12 @@
import {Metadata} from './metadata' import {Metadata} from './metadata'
import {WriteObject, CallStream} from './call-stream' import {StatusObject, CallStream} from './call-stream'
export interface Filter { export interface Filter {
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>; sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata>; receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject>; receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject>;
} }
export abstract class BaseFilter { export abstract class BaseFilter {

View File

@ -65,7 +65,8 @@ function validate(key: string, value?: MetadataValue): void {
* A class for storing metadata. Keys are normalized to lowercase ASCII. * A class for storing metadata. Keys are normalized to lowercase ASCII.
*/ */
export class Metadata { export class Metadata {
constructor(protected readonly internalRepr: MetadataObject = {}) {} private internalRepr: MetadataObject;
constructor() {}
/** /**
* Sets the given value for the given key by replacing any other values * Sets the given value for the given key by replacing any other values
@ -145,7 +146,9 @@ export class Metadata {
* @return The newly cloned object. * @return The newly cloned object.
*/ */
clone(): Metadata { clone(): Metadata {
return new Metadata(cloneMetadataObject(this.internalRepr)); let newMetadata = new Metadata();
newMetadata.internalRepr = cloneMetadataObject(this.internalRepr);
return newMetadata;
} }
/** /**