diff --git a/src/call-stream.ts b/src/call-stream.ts index 5f788f79..39e8343d 100644 --- a/src/call-stream.ts +++ b/src/call-stream.ts @@ -7,7 +7,7 @@ import {Status} from './constants'; import {Metadata} from './metadata'; import {ObjectDuplex} from './object-stream'; import {Filter} from './filter' -import {FilterStackFactory} from './filter-stack' +import {FilterStackFactory} from './filter-stack'; const { HTTP2_HEADER_STATUS, @@ -119,7 +119,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream { } private endCall(status: StatusObject): void { - if (!this.finalStatus === null) { + if (this.finalStatus === null) { this.finalStatus = status; this.emit('status', status); } @@ -190,11 +190,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream { } catch (e) { metadata = new Metadata(); } - let status: StatusObject = { - code: code, - details: details, - metadata: metadata - }; + let status: StatusObject = { code, details, metadata }; this.filterStack.receiveTrailers(Promise.resolve(status)).then((finalStatus) => { this.endCall(finalStatus); }, (error) => { @@ -213,6 +209,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream { switch(this.readState) { case ReadState.NO_DATA: this.readCompressFlag = (data.readUInt8(readHead) !== 0); + readHead += 1; this.readState = ReadState.READING_SIZE; this.readPartialSize.fill(0); this.readSizeRemaining = 4; @@ -240,7 +237,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream { // readMessageRemaining >=0 here if (this.readMessageRemaining === 0) { // At this point, we have read a full message - let messageBytes = Buffer.concat(this.readPartialMessage, this.readMessageSize); + const messageBytes = Buffer.concat(this.readPartialMessage, this.readMessageSize); // TODO(murgatroid99): Add receive message filters if (canPush) { if (!this.push(messageBytes)) { @@ -250,6 +247,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream { } else { this.unpushedReadMessages.push(messageBytes); } + this.readState = ReadState.NO_DATA; } } } @@ -295,6 +293,18 @@ export class Http2CallStream extends stream.Duplex implements CallStream { metadata: new Metadata() }); }); + if (!this.pendingRead) { + stream.pause(); + } + if (this.pendingWrite) { + if (!this.pendingWriteCallback) { + throw new Error('Invalid state in write handling code'); + } + stream.write(this.pendingWrite, this.pendingWriteCallback); + } + if (this.pendingFinalCallback) { + stream.end(this.pendingFinalCallback); + } } } @@ -328,8 +338,8 @@ export class Http2CallStream extends stream.Duplex implements CallStream { this.pendingRead = true; } else { while (this.unpushedReadMessages.length > 0) { - let nextMessage = this.unpushedReadMessages.shift(); - let keepPushing = this.push(nextMessage); + const nextMessage = this.unpushedReadMessages.shift(); + const keepPushing = this.push(nextMessage); if (nextMessage === null || (!keepPushing)) { return; } @@ -345,7 +355,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream { private encodeMessage(message: WriteObject): Buffer { /* allocUnsafe doesn't initiate the bytes in the buffer. We are explicitly * overwriting every single byte, so that should be fine */ - let output: Buffer = Buffer.allocUnsafe(message.message.length + 5); + const output: Buffer = Buffer.allocUnsafe(message.message.length + 5); // TODO(murgatroid99): handle compressed flag appropriately output.writeUInt8(0, 0); output.writeUInt32BE(message.message.length, 1); @@ -355,7 +365,7 @@ export class Http2CallStream extends stream.Duplex implements CallStream { _write(chunk: WriteObject, encoding: string, cb: Function) { // TODO(murgatroid99): Add send message filters - let encodedMessage = this.encodeMessage(chunk); + const encodedMessage = this.encodeMessage(chunk); if (this.http2Stream === null) { this.pendingWrite = encodedMessage; this.pendingWriteCallback = cb; diff --git a/src/channel-credentials.ts b/src/channel-credentials.ts index 9a34fc0d..49a93422 100644 --- a/src/channel-credentials.ts +++ b/src/channel-credentials.ts @@ -103,8 +103,7 @@ class SecureChannelCredentialsImpl extends ChannelCredentialsImpl { } compose(callCredentials: CallCredentials) : ChannelCredentialsImpl { - const combinedCallCredentials = - this.callCredentials.compose(callCredentials); + const combinedCallCredentials = this.callCredentials.compose(callCredentials); return new SecureChannelCredentialsImpl(this.secureContext, combinedCallCredentials); } diff --git a/src/channel.ts b/src/channel.ts index aa3b92fb..5e9cffbc 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -82,12 +82,14 @@ export class Http2Channel extends EventEmitter implements Channel { } else { this.subChannel = http2.connect(this.address, {secureContext}); } - this.subChannel.on('connect', () => { + this.subChannel.once('connect', () => { this.transitionToState(ConnectivityState.READY); }); this.subChannel.setTimeout(IDLE_TIMEOUT_MS, () => { this.goIdle(); }); + /* TODO(murgatroid99): add connection-level error handling with exponential + * reconnection backoff */ } private goIdle(): void { @@ -155,9 +157,8 @@ export class Http2Channel extends EventEmitter implements Channel { } let finalOptions: CallStreamOptions = { deadline: options.deadline === undefined ? Infinity : options.deadline, - credentials: options.credentials === undefined ? - CallCredentials.createEmpty() : options.credentials, - flags: options.flags === undefined ? 0 : options.flags + credentials: options.credentials || CallCredentials.createEmpty(), + flags: options.flags || 0 } let stream: Http2CallStream = new Http2CallStream(methodName, finalOptions, this.filterStackFactory); this.startHttp2Stream(methodName, stream, metadata); @@ -169,7 +170,7 @@ export class Http2Channel extends EventEmitter implements Channel { if (this.connectivityState === ConnectivityState.READY) { setImmediate(callback); } else { - this.on('connectivityStateChanged', (newState) => { + this.once('connectivityStateChanged', (newState) => { if (newState === ConnectivityState.READY) { callback(); } diff --git a/src/compression-filter.ts b/src/compression-filter.ts index b010f4c5..6ec5598e 100644 --- a/src/compression-filter.ts +++ b/src/compression-filter.ts @@ -4,19 +4,16 @@ import {Filter, BaseFilter, FilterFactory} from './filter' import {Metadata} from './metadata' export class CompressionFilter extends BaseFilter implements Filter { - constructor() { - super(); - } async sendMetadata(metadata: Promise): Promise { - let headers: Metadata = await metadata; + const headers: Metadata = await metadata; headers.set('grpc-encoding', 'identity'); headers.set('grpc-accept-encoding', 'identity'); return headers; } async receiveMetadata(metadata: Promise): Promise { - let headers: Metadata = await metadata; + const headers: Metadata = await metadata; headers.remove('grpc-encoding'); headers.remove('grpc-accept-encoding'); return headers; diff --git a/src/filter-stack.ts b/src/filter-stack.ts index 2a402c3d..97e94c5d 100644 --- a/src/filter-stack.ts +++ b/src/filter-stack.ts @@ -6,16 +6,16 @@ import {Filter, FilterFactory} from './filter'; export class FilterStack implements Filter { constructor(private readonly filters: Filter[]) {} - async sendMetadata(metadata: Promise) { - return await flow(map(this.filters, (filter) => filter.sendMetadata.bind(filter)))(metadata); + sendMetadata(metadata: Promise) { + return flow(map(this.filters, (filter) => filter.sendMetadata.bind(filter)))(metadata); } - async receiveMetadata(metadata: Promise) { - return await flowRight(map(this.filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata); + receiveMetadata(metadata: Promise) { + return flowRight(map(this.filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata); } - async receiveTrailers(status: Promise): Promise { - return await flowRight(map(this.filters, (filter) => filter.receiveTrailers.bind(filter)))(status); + receiveTrailers(status: Promise): Promise { + return flowRight(map(this.filters, (filter) => filter.receiveTrailers.bind(filter)))(status); } } diff --git a/src/filter.ts b/src/filter.ts index 058c45c2..f0f2affe 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,6 +1,9 @@ import {Metadata} from './metadata' import {StatusObject, CallStream} from './call-stream' +/** + * Filter classes represent related per-call logic and state that is primarily + * used to modify incoming and outgoing data */ export interface Filter { sendMetadata(metadata: Promise): Promise; diff --git a/tsconfig.json b/tsconfig.json index e189ea83..62024719 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -13,4 +13,4 @@ "exclude": [ "node_modules" ] -} \ No newline at end of file +}