Added most of the channel and call-stream implementations, plus filters

This commit is contained in:
murgatroid99 2017-08-24 12:29:28 -07:00
parent 3c36d7a94c
commit a1c5af652d
7 changed files with 439 additions and 50 deletions

View File

@ -0,0 +1,28 @@
import {promisify} from 'util'
import {Filter} from './filter'
import {CallCredentials} from './call-credentials'
export class CallCredentialsFilter extends BaseFilter implements Filter {
private credsMetadata: Promise<Metadata>;
constructor(credentials: CallCredentials) {
// TODO(murgatroid99): pass real options to generateMetadata
credsMetadata = util.promisify(credentials.generateMetadata.bind(credentials))({});
}
async sendMetadata(metadata: Promise<Metadata>) {
return (await metadata).merge(await this.credsMetadata);
}
}
export class CallCredentialsFilterFactory implements FilterFactory<CallCredentialsFilter> {
private credentials: CallCredentials | null;
constructor(channel: Http2Channel) {
this.credentials = channel.credentials.getCallCredentials();
}
createFilter(callStream: CallStream): CallCredentialsFilter {
return new CallCredentialsFilter(this.credentials.compose(callStream.credentials));
}
}

View File

@ -4,6 +4,8 @@ import {CallCredentials} from './call-credentials';
import {Status} from './constants';
import {Metadata} from './metadata';
import {ObjectDuplex} from './object-stream';
import {Filter} from './filter'
import {FilterStackFactory} from './filter-stack'
export interface CallOptions {
deadline?: Date|number;
@ -61,17 +63,239 @@ export interface CallStream extends ObjectDuplex<WriteObject, Buffer> {
this;
}
enum ReadState {
NO_DATA,
READING_SIZE,
READING_MESSAGE
}
export class Http2CallStream extends stream.Duplex implements CallStream {
private filterStack: Filter;
private statusEmitted: bool = false;
private http2Stream: ClientHttp2Stream | null = null;
private pendingRead: bool = false;
private pendingWrite: Buffer | null = null;
private pendingWriteCallback: Function | null = null;
private pendingFinalCallback: Function | null = null;
private readState: ReadState = ReadState.NO_DATA;
private readCompressFlag: bool = false;
private readPartialSize: Buffer = Buffer.alloc(4);
private readSizeRemaining: number = 4;
private readMessageSize: number = 0;
private readPartialMessage: Buffer[] = [];
private readMessageRemaining = 0;
private unpushedReadMessages: (Buffer | null)[] = [];
// Status code mapped from :status. To be used if grpc-status is not received
private mappedStatusCode: Status = Status.UNKNOWN;
constructor(public readonly methodName: string, public readonly options: CallOptions,
filterStackFactory: FilterStackFactory) {
this.filterStack = FilterStackFactory.createFilter(this);
}
private endCall(status: StatusObject): void {
if (!this.statusEmitted) {
this.emit('status', {code: status, details: details, metadata: new Metadata()});
this.statusEmitted = true;
}
}
attachHttp2Stream(stream: ClientHttp2Stream): void {
throw new Error('Not yet implemented');
if (this.statusEmitted) {
// TODO(murgatroid99): Handle call end before http2 stream start
} else {
this.http2Stream = stream;
stream.on('response', (headers) => {
switch(headers[HTTP2_HEADER_STATUS]) {
// TODO(murgatroid99): handle 100 and 101
case '400':
this.mappedStatusCode = Status.INTERNAL;
break;
case '401':
this.mappedStatusCode = Status.UNAUTHENTICATED;
break;
case '403':
this.mappedStatusCode = Status.PERMISSION_DENIED;
break;
case '404':
this.mappedStatusCode = Status.UNIMPLEMENTED;
break;
case '429':
case '502':
case '503':
case '504':
this.mappedStatusCode = Status.UNAVAILABLE;
break;
default:
this.mappedStatusCode = Status.UNKNOWN;
}
delete headers[HTTP2_HEADERS_STATUS];
delete headers[HTTP2_HEADERS_CONTENT_TYPE];
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
this.cancelWithStatus(Status.UNKNOWN, e.message);
return;
}
this.filterStack.receiveMetadata(Promise.resolve(metadata)).then((finalMetadata) => {
this.emit('metadata', finalMetadata);
}, (error) => {
this.cancelWithStatus(Status.UNKNOWN, error.message);
});
});
stream.on('trailers', (headers) => {
let code: Status = this.mappedStatusCode;
if (headers.hasOwnProperty('grpc-status')) {
let receivedCode = Number(headers['grpc-status']);
if (possibleCode in Status) {
code = receivedCode;
} else {
code = Status.UNKNOWN;
}
delete headers['grpc-status'];
}
let details: string = '';
if (headers.hasOwnProperty('grpc-message')) {
details = decodeURI(headers['grpc-message']);
}
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
metadata = new Metadata();
}
this.filterStack.receiveTrailers(Promise.resolve(status)).then((finalStatus) => {
this.endCall(finalStatus);
}, (error) => {
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata();
});
});
});
stream.on('read', (data) => {
let readHead = 0;
let canPush = true;
while (readHead < data.length) {
switch(this.readState) {
case ReadState.NO_DATA:
readCompressFlag = (data.readUInt8(readHead) !== 0);
this.readState = ReadState.READING_SIZE;
this.readPartialSize.fill(0);
this.readSizeRemaining = 4;
this.readMessageSize = 0;
this.readMessageRemaining = 0;
this.readPartialMessage = [];
break;
case ReadState.READING_SIZE:
let toRead: number = Math.min(data.length - readHead, this.readSizeRemaining);
data.copy(readPartialSize, 4 - this.readSizeRemaining, readHead, readHead + toRead);
this.readSizeRemaining -= toRead;
readHead += toRead;
// readSizeRemaining >=0 here
if (this.readSizeRemaining === 0) {
this.readMessageSize = readPartialSize.readUInt32BE(0);
this.readMessageRemaining = this.readMessageSize;
this.readState = ReadState.READING_MESSAGE;
}
break;
case ReadSize.READING_MESSAGE:
let toRead: number = math.min(data.length - readHead, this.readMessageRemaining);
readPartialMessage.push(data.slice(readHead, readHead + toRead));
this.readMessageRemaining -= toRead;
this.readHead += toRead;
// readMessageRemaining >=0 here
if (this.readMessageRemaining === 0) {
// At this point, we have read a full message
let messageBytes = Buffer.concat(readPartialMessage, readMessageSize);
// TODO(murgatroid99): Add receive message filters
if (canPush) {
if (!this.push(messageBytes)) {
canPush = false;
this.http2Stream.pause();
}
} else {
this.unpushedReadMessages.push(messageBytes);
}
}
}
}
});
stream.on('end', () => {
if (this.unpushedReadMessages.length === 0) {
this.push(null);
} else {
this.unpushedReadMessages.push(null);
}
});
}
}
cancelWithStatus(status: Status, details: string): void {
throw new Error('Not yet implemented');
this.endCall({code: status, details: details, metadata: new Metadata()});
if (this.http2Stream !== null) {
/* TODO(murgatroid99): Determine if we want to send different RST_STREAM
* codes based on the status code */
this.http2Stream.rstWithCancel();
}
}
getPeer(): string {
throw new Error('Not yet implemented');
}
_read(size: number) {
if (this.http2Stream === null) {
this.pendingRead = true;
} else {
while (unpushedReadMessages.length > 0) {
let nextMessage = unpushedReadMessages.shift();
let keepPushing = this.push(nextMessage);
if (nextMessage === null || (!keepPushing)) {
return;
}
}
/* Only resume reading from the http2Stream if we don't have any pending
* messages to emit, and we haven't gotten the signal to stop pushing
* messages */
this.http2Stream.resume();
}
}
// Encode a message to the wire format
private encodeMessage(message: WriteObject): Buffer {
/* unsafeAlloc doesn't initiate the bytes in the buffer. We are explicitly
* overwriting every single byte, so that should be fine */
let output: Buffer = Buffer.unsafeAlloc(message.length + 5);
// TODO(murgatroid99): handle compressed flag appropriately
output.writeUInt8(0, 0);
output.writeUint32BE(message.message.length, 1);
message.message.copy(output, 5);
return output;
}
_write(chunk: WriteObject, encoding: string, cb: Function) {
// TODO(murgatroid99): Add send message filters
let encodedMessage = encodeMessage(chunk);
if (this.http2Stream === null) {
this.pendingWrite = encodedMessage;
this.pendingWriteCallback = cb;
} else {
this.http2Stream.write(encodedMessage, cb);
}
}
_final(cb: Function) {
if (this.http2Stream === null) {
this.pendingFinalCallback = cb;
} else {
this.http2Stream.end(cb);
}
}
}

View File

@ -5,7 +5,13 @@ import {IncomingHttpHeaders, OutgoingHttpHeaders} from 'http';
import * as url from 'url';
import {CallOptions, CallStream} from './call-stream';
import {ChannelCredentials} from './channel-credentials';
import {Metadata} from './metadata';
import {Metadata, MetadataObject} from './metadata';
import {Status} from './constants'
import {FilterStackFactory} from './filter-stack'
import {DeadlineFilterFactory} from './deadline-filter'
import {CallCredentialsFilterFactory} from './call-credentials-filter'
import {Http2FilterFactory} from './http2-filter'
const IDLE_TIMEOUT_MS = 300000;
@ -39,7 +45,7 @@ export enum ConnectivityState {
*/
export interface Channel extends EventEmitter {
createStream(methodName: string, metadata: OutgoingHttp2Headers, options: CallOptions): CallStream;
connect(): void;
connect(() => void): void;
getConnectivityState(): ConnectivityState;
close(): void;
@ -50,17 +56,6 @@ export interface Channel extends EventEmitter {
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;
addListener(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): this;
emit(event: 'connectivity_state_changed', state: ConnectivityState): boolean;
on(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): this;
once(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): this;
prependListener(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void):
this;
prependOnceListener(
event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): this;
removeListener(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void):
this;
}
export class Http2Channel extends EventEmitter implements Channel {
@ -69,13 +64,13 @@ export class Http2Channel extends EventEmitter implements Channel {
/* For now, we have up to one subchannel, which will exist as long as we are
* connecting or trying to connect */
private subChannel : Http2Session | null;
private secureContext : SecureContext | null;
private address : url.Url;
private filterStackFactory : FilterStackFactory;
private transitionToState(newState: ConnectivityState): void {
if (newState !== this.connectivityState) {
this.connectivityState = newState;
this.emit('connectivity_state_changed', newState);
this.emit('connectivityStateChanged', newState);
}
}
@ -85,6 +80,9 @@ export class Http2Channel extends EventEmitter implements Channel {
this.subChannel.on('connect', () => {
this.transitionToState(ConnectivityState.READY);
});
this.subChannel.setTimeout(IDLE_TIMEOUT_MS, () => {
this.goIdle();
});
}
private goIdle(): void {
@ -92,59 +90,62 @@ export class Http2Channel extends EventEmitter implements Channel {
this.transitionToState(ConnectivityState.IDLE);
}
/* Reset the lastRpcActivity date to now, and kick the connectivity state
* machine out of idle */
private kickConnectivityState(): void {
if (this.connectivityState === ConnectivityState.IDLE) {
this.startConnecting();
} else {
clearTimeout(this.idleTimeoutId);
}
this.idleTimeoutId = setTimeout(() => {
this.goIdle();
}, IDLE_TIMEOUT_MS);
}
constructor(address: url.Url,
credentials: ChannelCredentials,
constructor(private readonly address: url.Url,
public readonly credentials: ChannelCredentials,
private readonly options: ChannelOptions) {
this.secureContext = credentials.getSecureContext();
if (this.secureContext === null) {
if (channelCredentials.getSecureContext() === null) {
address.protocol = 'http';
} else {
address.protocol = 'https';
}
this.address = address;
this.filterStackFactory = new FilterStackFactory([
new CompressionFilterFactory(this),
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this)
]);
}
createStream(methodName: string, metadata: OutgoingHttpHeaders, options: CallOptions): CallStream {
createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream {
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down');
}
this.kickConnectivityState();
let stream: Http2CallStream = new Http2CallStream();
metadata[HTTP2_HEADER_AUTHORITY] = this.address.hostname;
metadata[HTTP2_HEADER_METHOD] = 'POST';
metadata[HTTP2_HEADER_PATH] = methodName;
metadata[HTTP2_HEADER_TE] = 'trailers';
if (this.connectivityState === ConnectivityState.READY) {
stream.attachHttp2Stream(this.subchannel.request(metadata));
} else {
let connectCb = (state) => {
if (state === ConnectivityState.READY) {
stream.attachHttp2Stream(this.subchannel.request(metadata));
this.removeListener('connectivity_state_changed', connectCb);
let stream: Http2CallStream = new Http2CallStream(methodName, options, this.filterStackFactory);
let finalMetadata: Promise<Metadata> = stream.filterStack.sendMetadata(Promise.resolve(metadata));
this.connect(() => {
finalMetadata.then((metadataValue) => {
let headers = metadataValue.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = this.address.hostname;
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = methodName;
headers[HTTP2_HEADER_TE] = 'trailers';
if (stream.isOpen()) {
stream.attachHttp2Stream(this.subchannel.request(headers));
}
};
this.on('connectivity_state_changed', connectCb);
}
}, (error) => {
stream.cancelWithStatus(Status.UNKNOWN, "Failed to generate metadata");
});
});
return stream;
}
connect(): void {
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down');
}
connect(callback: () => void): void {
this.kickConnectivityState();
if (this.connectivityState === ConnectivityState.READY) {
setImmediate(callback);
} else {
this.on('connectivityStateChanged', (newState) => {
if (newState === ConnectivityState.READY) {
callback();
}
});
}
}
getConnectivityState(): ConnectivityState{

27
src/compression-filter.ts Normal file
View File

@ -0,0 +1,27 @@
import {Filter, BaseFilter} from './filter'
import {Metadata} from './metadata'
export class CompressionFilter extends BaseFilter implements Filter {
constructor() {}
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
let headers: Metadata = await metadata;
headers.set('grpc-encoding', 'identity');
headers.set('grpc-accept-encoding', 'identity');
return headers;
}
async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata {
let headers: Metadata = await metadata;
headers.remove('grpc-encoding');
headers.remove('grpc-accept-encoding');
return headers;
}
}
export class CompressionFilterFactory<CompressionFilter> {
constructor(channel) {}
createFilter(callStream: CallStream): CompressionFilter {
return new CompressionFilter();
}
}

55
src/deadline-filter.ts Normal file
View File

@ -0,0 +1,55 @@
import {Filter} from './filter'
import {Status} from './constants'
const units = [
['m', 1],
['S', 1000],
['M', 60 * 1000],
['H', 60 * 60 * 1000]
]
export class DeadlineFilter extends BaseFilter implements Filter {
private deadline;
constructor(private readonly channel: Channel, private readonly callStream: CallStream) {
let deadline = callStream.deadline;
this.deadline = deadline;
let now: number = (new Date()).getTime();
let timeout = deadline - now;
if (timeout < 0) {
timeout = 0;
}
if (deadline !== Infinity) {
setTimeout(() => {
callStream.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
}, timeout);
}
}
async sendMetadata(metadata: Promise<Metadata>) {
if (this.deadline === Infinity) {
return await metadata;
}
let timeoutString : Promise<string> = new Promise<string>((resolve, reject) => {
this.channel.connect(() => {
let now = (new Date()).getTime();
let timeoutMs = this.deadline - now;
for (let [unit, factor] of units) {
let amount = timeoutMs / factor;
if (amount < 1e8) {
resolve(String(Math.ceil(amount)) + unit);
return;
}
}
});
});
(await metadata).set('grpc-timeout', await timeoutString);
}
}
export class DeadlineFilterFactory implements FilterFactory<DeadlineFilter> {
constructor(private readonly channel: Http2Channel) {}
createFilter(callStream: CallStream): DeadlineFilter {
return new DeadlineFilter(this.channel, callStream);
}
}

26
src/filter-stack.ts Normal file
View File

@ -0,0 +1,26 @@
import {flow, map} from 'lodash';
import {Filter} from './filter'
export class FilterStack implements Filter {
constructor(private readonly filters: Filter[]) {}
async sendMetadata(metadata: Promise<Metadata>) {
return await flow(map(filters, (filter) => filter.sendMetadata.bind(filter)))(metadata);
}
async receiveMetadata(metadata: Promise<Metadata>) {
return await flowRight(map(filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata);
}
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
return await flowRight(map(filters, (filter) => filter.receiveTrailers.bind(filter)))(status);
}
}
export class FilterStackFactory implements FilterFactory<FilterStack> {
constructor(private readonly factories: FilterFactory[]) {}
createFilter(callStream: CallStream): FilterStack {
return new FilterStack(map(factories, (factory) => factory.createFilter(callStream)));
}
}

28
src/filter.ts Normal file
View File

@ -0,0 +1,28 @@
import {Metadata} from './metadata'
import {WriteObject, CallStream} from './call-stream'
export interface Filter {
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata>;
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject>;
}
export abstract class BaseFilter {
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
return await metadata;
}
async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
return await metadata;
}
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
return await status;
}
}
export interface FilterFactory<T extends Filter> {
createFilter(callStream: CallStream): T;
}