Resolve comments

This commit is contained in:
murgatroid99 2017-08-28 14:38:30 -07:00
parent 57e78eb9d6
commit f012088ecc
7 changed files with 41 additions and 31 deletions

View File

@ -7,7 +7,7 @@ import {Status} from './constants';
import {Metadata} from './metadata'; import {Metadata} from './metadata';
import {ObjectDuplex} from './object-stream'; import {ObjectDuplex} from './object-stream';
import {Filter} from './filter' import {Filter} from './filter'
import {FilterStackFactory} from './filter-stack' import {FilterStackFactory} from './filter-stack';
const { const {
HTTP2_HEADER_STATUS, HTTP2_HEADER_STATUS,
@ -119,7 +119,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
} }
private endCall(status: StatusObject): void { private endCall(status: StatusObject): void {
if (!this.finalStatus === null) { if (this.finalStatus === null) {
this.finalStatus = status; this.finalStatus = status;
this.emit('status', status); this.emit('status', status);
} }
@ -190,11 +190,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
} catch (e) { } catch (e) {
metadata = new Metadata(); metadata = new Metadata();
} }
let status: StatusObject = { let status: StatusObject = { code, details, metadata };
code: code,
details: details,
metadata: metadata
};
this.filterStack.receiveTrailers(Promise.resolve(status)).then((finalStatus) => { this.filterStack.receiveTrailers(Promise.resolve(status)).then((finalStatus) => {
this.endCall(finalStatus); this.endCall(finalStatus);
}, (error) => { }, (error) => {
@ -213,6 +209,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
switch(this.readState) { switch(this.readState) {
case ReadState.NO_DATA: case ReadState.NO_DATA:
this.readCompressFlag = (data.readUInt8(readHead) !== 0); this.readCompressFlag = (data.readUInt8(readHead) !== 0);
readHead += 1;
this.readState = ReadState.READING_SIZE; this.readState = ReadState.READING_SIZE;
this.readPartialSize.fill(0); this.readPartialSize.fill(0);
this.readSizeRemaining = 4; this.readSizeRemaining = 4;
@ -240,7 +237,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
// readMessageRemaining >=0 here // readMessageRemaining >=0 here
if (this.readMessageRemaining === 0) { if (this.readMessageRemaining === 0) {
// At this point, we have read a full message // At this point, we have read a full message
let messageBytes = Buffer.concat(this.readPartialMessage, this.readMessageSize); const messageBytes = Buffer.concat(this.readPartialMessage, this.readMessageSize);
// TODO(murgatroid99): Add receive message filters // TODO(murgatroid99): Add receive message filters
if (canPush) { if (canPush) {
if (!this.push(messageBytes)) { if (!this.push(messageBytes)) {
@ -250,6 +247,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
} else { } else {
this.unpushedReadMessages.push(messageBytes); this.unpushedReadMessages.push(messageBytes);
} }
this.readState = ReadState.NO_DATA;
} }
} }
} }
@ -295,6 +293,18 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
metadata: new Metadata() metadata: new Metadata()
}); });
}); });
if (!this.pendingRead) {
stream.pause();
}
if (this.pendingWrite) {
if (!this.pendingWriteCallback) {
throw new Error('Invalid state in write handling code');
}
stream.write(this.pendingWrite, this.pendingWriteCallback);
}
if (this.pendingFinalCallback) {
stream.end(this.pendingFinalCallback);
}
} }
} }
@ -328,8 +338,8 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
this.pendingRead = true; this.pendingRead = true;
} else { } else {
while (this.unpushedReadMessages.length > 0) { while (this.unpushedReadMessages.length > 0) {
let nextMessage = this.unpushedReadMessages.shift(); const nextMessage = this.unpushedReadMessages.shift();
let keepPushing = this.push(nextMessage); const keepPushing = this.push(nextMessage);
if (nextMessage === null || (!keepPushing)) { if (nextMessage === null || (!keepPushing)) {
return; return;
} }
@ -345,7 +355,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
private encodeMessage(message: WriteObject): Buffer { private encodeMessage(message: WriteObject): Buffer {
/* allocUnsafe doesn't initiate the bytes in the buffer. We are explicitly /* allocUnsafe doesn't initiate the bytes in the buffer. We are explicitly
* overwriting every single byte, so that should be fine */ * overwriting every single byte, so that should be fine */
let output: Buffer = Buffer.allocUnsafe(message.message.length + 5); const output: Buffer = Buffer.allocUnsafe(message.message.length + 5);
// TODO(murgatroid99): handle compressed flag appropriately // TODO(murgatroid99): handle compressed flag appropriately
output.writeUInt8(0, 0); output.writeUInt8(0, 0);
output.writeUInt32BE(message.message.length, 1); output.writeUInt32BE(message.message.length, 1);
@ -355,7 +365,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream {
_write(chunk: WriteObject, encoding: string, cb: Function) { _write(chunk: WriteObject, encoding: string, cb: Function) {
// TODO(murgatroid99): Add send message filters // TODO(murgatroid99): Add send message filters
let encodedMessage = this.encodeMessage(chunk); const encodedMessage = this.encodeMessage(chunk);
if (this.http2Stream === null) { if (this.http2Stream === null) {
this.pendingWrite = encodedMessage; this.pendingWrite = encodedMessage;
this.pendingWriteCallback = cb; this.pendingWriteCallback = cb;

View File

@ -103,8 +103,7 @@ class SecureChannelCredentialsImpl extends ChannelCredentialsImpl {
} }
compose(callCredentials: CallCredentials) : ChannelCredentialsImpl { compose(callCredentials: CallCredentials) : ChannelCredentialsImpl {
const combinedCallCredentials = const combinedCallCredentials = this.callCredentials.compose(callCredentials);
this.callCredentials.compose(callCredentials);
return new SecureChannelCredentialsImpl(this.secureContext, return new SecureChannelCredentialsImpl(this.secureContext,
combinedCallCredentials); combinedCallCredentials);
} }

View File

@ -82,12 +82,14 @@ export class Http2Channel extends EventEmitter implements Channel {
} else { } else {
this.subChannel = http2.connect(this.address, {secureContext}); this.subChannel = http2.connect(this.address, {secureContext});
} }
this.subChannel.on('connect', () => { this.subChannel.once('connect', () => {
this.transitionToState(ConnectivityState.READY); this.transitionToState(ConnectivityState.READY);
}); });
this.subChannel.setTimeout(IDLE_TIMEOUT_MS, () => { this.subChannel.setTimeout(IDLE_TIMEOUT_MS, () => {
this.goIdle(); this.goIdle();
}); });
/* TODO(murgatroid99): add connection-level error handling with exponential
* reconnection backoff */
} }
private goIdle(): void { private goIdle(): void {
@ -155,9 +157,8 @@ export class Http2Channel extends EventEmitter implements Channel {
} }
let finalOptions: CallStreamOptions = { let finalOptions: CallStreamOptions = {
deadline: options.deadline === undefined ? Infinity : options.deadline, deadline: options.deadline === undefined ? Infinity : options.deadline,
credentials: options.credentials === undefined ? credentials: options.credentials || CallCredentials.createEmpty(),
CallCredentials.createEmpty() : options.credentials, flags: options.flags || 0
flags: options.flags === undefined ? 0 : options.flags
} }
let stream: Http2CallStream = new Http2CallStream(methodName, finalOptions, this.filterStackFactory); let stream: Http2CallStream = new Http2CallStream(methodName, finalOptions, this.filterStackFactory);
this.startHttp2Stream(methodName, stream, metadata); this.startHttp2Stream(methodName, stream, metadata);
@ -169,7 +170,7 @@ export class Http2Channel extends EventEmitter implements Channel {
if (this.connectivityState === ConnectivityState.READY) { if (this.connectivityState === ConnectivityState.READY) {
setImmediate(callback); setImmediate(callback);
} else { } else {
this.on('connectivityStateChanged', (newState) => { this.once('connectivityStateChanged', (newState) => {
if (newState === ConnectivityState.READY) { if (newState === ConnectivityState.READY) {
callback(); callback();
} }

View File

@ -4,19 +4,16 @@ import {Filter, BaseFilter, FilterFactory} from './filter'
import {Metadata} from './metadata' import {Metadata} from './metadata'
export class CompressionFilter extends BaseFilter implements Filter { export class CompressionFilter extends BaseFilter implements Filter {
constructor() {
super();
}
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> { async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
let headers: Metadata = await metadata; const headers: Metadata = await metadata;
headers.set('grpc-encoding', 'identity'); headers.set('grpc-encoding', 'identity');
headers.set('grpc-accept-encoding', 'identity'); headers.set('grpc-accept-encoding', 'identity');
return headers; return headers;
} }
async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata> { async receiveMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
let headers: Metadata = await metadata; const headers: Metadata = await metadata;
headers.remove('grpc-encoding'); headers.remove('grpc-encoding');
headers.remove('grpc-accept-encoding'); headers.remove('grpc-accept-encoding');
return headers; return headers;

View File

@ -6,16 +6,16 @@ import {Filter, FilterFactory} from './filter';
export class FilterStack implements Filter { export class FilterStack implements Filter {
constructor(private readonly filters: Filter[]) {} constructor(private readonly filters: Filter[]) {}
async sendMetadata(metadata: Promise<Metadata>) { sendMetadata(metadata: Promise<Metadata>) {
return await flow(map(this.filters, (filter) => filter.sendMetadata.bind(filter)))(metadata); return flow(map(this.filters, (filter) => filter.sendMetadata.bind(filter)))(metadata);
} }
async receiveMetadata(metadata: Promise<Metadata>) { receiveMetadata(metadata: Promise<Metadata>) {
return await flowRight(map(this.filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata); return flowRight(map(this.filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata);
} }
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> { receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
return await flowRight(map(this.filters, (filter) => filter.receiveTrailers.bind(filter)))(status); return flowRight(map(this.filters, (filter) => filter.receiveTrailers.bind(filter)))(status);
} }
} }

View File

@ -1,6 +1,9 @@
import {Metadata} from './metadata' import {Metadata} from './metadata'
import {StatusObject, CallStream} from './call-stream' import {StatusObject, CallStream} from './call-stream'
/**
* Filter classes represent related per-call logic and state that is primarily
* used to modify incoming and outgoing data */
export interface Filter { export interface Filter {
sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>; sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>;