Format code

This commit is contained in:
murgatroid99 2018-05-30 16:08:42 -07:00
parent 3edea49bb3
commit acfcc9f981
8 changed files with 96 additions and 82 deletions

View File

@ -143,7 +143,7 @@ export class Http2CallStream extends Duplex implements CallStream {
} }
} }
private filterReceivedMessage(framedMessage: Buffer | null) { private filterReceivedMessage(framedMessage: Buffer|null) {
if (framedMessage === null) { if (framedMessage === null) {
if (this.canPush) { if (this.canPush) {
this.push(null); this.push(null);
@ -153,12 +153,13 @@ export class Http2CallStream extends Duplex implements CallStream {
return; return;
} }
this.isReadFilterPending = true; this.isReadFilterPending = true;
this.filterStack.receiveMessage(Promise.resolve(framedMessage)).then( this.filterStack.receiveMessage(Promise.resolve(framedMessage))
this.handleFilteredRead.bind(this), .then(
this.handleFilterError.bind(this)); this.handleFilteredRead.bind(this),
this.handleFilterError.bind(this));
} }
private tryPush(messageBytes: Buffer | null): void { private tryPush(messageBytes: Buffer|null): void {
if (this.isReadFilterPending) { if (this.isReadFilterPending) {
this.unfilteredReadMessages.push(messageBytes); this.unfilteredReadMessages.push(messageBytes);
} else { } else {
@ -268,7 +269,7 @@ export class Http2CallStream extends Duplex implements CallStream {
while (readHead < data.length) { while (readHead < data.length) {
switch (this.readState) { switch (this.readState) {
case ReadState.NO_DATA: case ReadState.NO_DATA:
this.readCompressFlag = data.slice(readHead, readHead+1); this.readCompressFlag = data.slice(readHead, readHead + 1);
readHead += 1; readHead += 1;
this.readState = ReadState.READING_SIZE; this.readState = ReadState.READING_SIZE;
this.readPartialSize.fill(0); this.readPartialSize.fill(0);
@ -291,7 +292,8 @@ export class Http2CallStream extends Duplex implements CallStream {
if (this.readMessageRemaining > 0) { if (this.readMessageRemaining > 0) {
this.readState = ReadState.READING_MESSAGE; this.readState = ReadState.READING_MESSAGE;
} else { } else {
this.tryPush(Buffer.concat([this.readCompressFlag, this.readPartialSize])); this.tryPush(Buffer.concat(
[this.readCompressFlag, this.readPartialSize]));
this.readState = ReadState.NO_DATA; this.readState = ReadState.NO_DATA;
} }
} }
@ -306,8 +308,11 @@ export class Http2CallStream extends Duplex implements CallStream {
// readMessageRemaining >=0 here // readMessageRemaining >=0 here
if (this.readMessageRemaining === 0) { if (this.readMessageRemaining === 0) {
// At this point, we have read a full message // At this point, we have read a full message
const framedMessageBuffers = [this.readCompressFlag, this.readPartialSize].concat(this.readPartialMessage); const framedMessageBuffers = [
const framedMessage = Buffer.concat(framedMessageBuffers, this.readMessageSize + 5); this.readCompressFlag, this.readPartialSize
].concat(this.readPartialMessage);
const framedMessage = Buffer.concat(
framedMessageBuffers, this.readMessageSize + 5);
this.tryPush(framedMessage); this.tryPush(framedMessage);
this.readState = ReadState.NO_DATA; this.readState = ReadState.NO_DATA;
} }

View File

@ -84,7 +84,7 @@ export interface Channel extends EventEmitter {
/* This should be a real subchannel class that contains a ClientHttp2Session, /* This should be a real subchannel class that contains a ClientHttp2Session,
* but for now this serves its purpose */ * but for now this serves its purpose */
type Http2SubChannel = http2.ClientHttp2Session & { type Http2SubChannel = http2.ClientHttp2Session&{
/* Count the number of currently active streams associated with the session. /* Count the number of currently active streams associated with the session.
* The purpose of this is to keep the session reffed if and only if there * The purpose of this is to keep the session reffed if and only if there
* is at least one active stream */ * is at least one active stream */

View File

@ -1,9 +1,10 @@
import {CallStream, WriteObject, WriteFlags} from './call-stream'; import * as zlib from 'zlib';
import {CallStream, WriteFlags, WriteObject} from './call-stream';
import {Channel} from './channel'; import {Channel} from './channel';
import {Status} from './constants';
import {BaseFilter, Filter, FilterFactory} from './filter'; import {BaseFilter, Filter, FilterFactory} from './filter';
import {Metadata, MetadataValue} from './metadata'; import {Metadata, MetadataValue} from './metadata';
import {Status} from './constants';
import * as zlib from 'zlib';
abstract class CompressionHandler { abstract class CompressionHandler {
protected abstract compressMessage(message: Buffer): Promise<Buffer>; protected abstract compressMessage(message: Buffer): Promise<Buffer>;
@ -38,7 +39,7 @@ abstract class CompressionHandler {
} }
} }
class IdentityHandler extends CompressionHandler{ class IdentityHandler extends CompressionHandler {
async compressMessage(message: Buffer) { async compressMessage(message: Buffer) {
return message; return message;
} }
@ -53,61 +54,57 @@ class IdentityHandler extends CompressionHandler{
return output; return output;
} }
decompressMessage(message: Buffer) : Promise<Buffer> { decompressMessage(message: Buffer): Promise<Buffer> {
return Promise.reject<Buffer>(new Error( return Promise.reject<Buffer>(new Error(
'Received compressed message but "grpc-encoding" header was identity')); 'Received compressed message but "grpc-encoding" header was identity'));
} }
} }
class DeflateHandler extends CompressionHandler { class DeflateHandler extends CompressionHandler {
compressMessage(message: Buffer) { compressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => { return new Promise<Buffer>(
zlib.deflate(message, (err, output) => { (resolve, reject) => {zlib.deflate(message, (err, output) => {
if (err) { if (err) {
reject(err); reject(err);
} else { } else {
resolve(output); resolve(output);
} }
}) })});
});
} }
decompressMessage(message: Buffer) { decompressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => { return new Promise<Buffer>(
zlib.inflate(message, (err, output) => { (resolve, reject) => {zlib.inflate(message, (err, output) => {
if (err) { if (err) {
reject(err); reject(err);
} else { } else {
resolve(output); resolve(output);
} }
}) })});
});
} }
} }
class GzipHandler extends CompressionHandler { class GzipHandler extends CompressionHandler {
compressMessage(message: Buffer) { compressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => { return new Promise<Buffer>(
zlib.gzip(message, (err, output) => { (resolve, reject) => {zlib.gzip(message, (err, output) => {
if (err) { if (err) {
reject(err); reject(err);
} else { } else {
resolve(output); resolve(output);
} }
}) })});
});
} }
decompressMessage(message: Buffer) { decompressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => { return new Promise<Buffer>(
zlib.unzip(message, (err, output) => { (resolve, reject) => {zlib.unzip(message, (err, output) => {
if (err) { if (err) {
reject(err); reject(err);
} else { } else {
resolve(output); resolve(output);
} }
}) })});
});
} }
} }
@ -117,26 +114,27 @@ class UnknownHandler extends CompressionHandler {
} }
compressMessage(message: Buffer): Promise<Buffer> { compressMessage(message: Buffer): Promise<Buffer> {
return Promise.reject<Buffer>(new Error( return Promise.reject<Buffer>(new Error(
`Received message compressed wth unsupported compression method ${this.compressionName}`)); `Received message compressed wth unsupported compression method ${
this.compressionName}`));
} }
decompressMessage(message: Buffer): Promise<Buffer> { decompressMessage(message: Buffer): Promise<Buffer> {
// This should be unreachable // This should be unreachable
return Promise.reject<Buffer>(new Error( return Promise.reject<Buffer>(
`Compression method not supported: ${this.compressionName}`)); new Error(`Compression method not supported: ${this.compressionName}`));
} }
} }
function getCompressionHandler(compressionName: string): CompressionHandler { function getCompressionHandler(compressionName: string): CompressionHandler {
switch (compressionName) { switch (compressionName) {
case 'identity': case 'identity':
return new IdentityHandler(); return new IdentityHandler();
case 'deflate': case 'deflate':
return new DeflateHandler(); return new DeflateHandler();
case 'gzip': case 'gzip':
return new GzipHandler(); return new GzipHandler();
default: default:
return new UnknownHandler(compressionName); return new UnknownHandler(compressionName);
} }
} }
@ -165,15 +163,16 @@ export class CompressionFilter extends BaseFilter implements Filter {
} }
async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> { async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
/* This filter is special. The input message is the bare message bytes, /* This filter is special. The input message is the bare message bytes,
* and the output is a framed and possibly compressed message. For this * and the output is a framed and possibly compressed message. For this
* reason, this filter should be at the bottom of the filter stack */ * reason, this filter should be at the bottom of the filter stack */
const resolvedMessage: WriteObject = await message; const resolvedMessage: WriteObject = await message;
const compress = resolvedMessage.flags === undefined ? false : const compress = resolvedMessage.flags === undefined ?
(resolvedMessage.flags & WriteFlags.NoCompress) === 0; false :
(resolvedMessage.flags & WriteFlags.NoCompress) === 0;
return { return {
message: await this.sendCompression.writeMessage(resolvedMessage.message, compress), message: await this.sendCompression.writeMessage(
resolvedMessage.message, compress),
flags: resolvedMessage.flags flags: resolvedMessage.flags
}; };
} }

View File

@ -19,14 +19,13 @@ export class FilterStack implements Filter {
} }
sendMessage(message: Promise<WriteObject>): Promise<WriteObject> { sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
return flow(map( return flow(map(this.filters, (filter) => filter.sendMessage.bind(filter)))(
this.filters, (filter) => filter.sendMessage.bind(filter)))(message); message);
} }
receiveMessage(message: Promise<Buffer>): Promise<Buffer> { receiveMessage(message: Promise<Buffer>): Promise<Buffer> {
return flowRight( return flowRight(map(
map(this.filters, (filter) => filter.receiveMessage.bind(filter)))( this.filters, (filter) => filter.receiveMessage.bind(filter)))(message);
message);
} }
receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> { receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {

View File

@ -15,8 +15,8 @@ interface IndexedObject {
function mixin(...sources: IndexedObject[]) { function mixin(...sources: IndexedObject[]) {
const result: {[key: string]: Function} = {}; const result: {[key: string]: Function} = {};
for(const source of sources) { for (const source of sources) {
for(const propName of Object.getOwnPropertyNames(source)) { for (const propName of Object.getOwnPropertyNames(source)) {
const property: any = source[propName]; const property: any = source[propName];
if (typeof property === 'function') { if (typeof property === 'function') {
result[propName] = property; result[propName] = property;
@ -35,7 +35,8 @@ export interface OAuth2Client {
/**** Client Credentials ****/ /**** Client Credentials ****/
// Using assign only copies enumerable properties, which is what we want // Using assign only copies enumerable properties, which is what we want
export const credentials = mixin({ export const credentials = mixin(
{
/** /**
* Create a gRPC credential from a Google credential object. * Create a gRPC credential from a Google credential object.
* @param googleCredentials The authentication client to use. * @param googleCredentials The authentication client to use.
@ -84,7 +85,8 @@ export const credentials = mixin({
CallCredentials => { CallCredentials => {
return additional.reduce((acc, other) => acc.compose(other), first); return additional.reduce((acc, other) => acc.compose(other), first);
} }
}, ChannelCredentials, CallCredentials); },
ChannelCredentials, CallCredentials);
/**** Metadata ****/ /**** Metadata ****/

View File

@ -6,9 +6,13 @@ import {ChannelCredentials} from './channel-credentials';
import {Client, UnaryCallback} from './client'; import {Client, UnaryCallback} from './client';
import {Metadata} from './metadata'; import {Metadata} from './metadata';
export interface Serialize<T> { (value: T): Buffer; } export interface Serialize<T> {
(value: T): Buffer;
}
export interface Deserialize<T> { (bytes: Buffer): T; } export interface Deserialize<T> {
(bytes: Buffer): T;
}
export interface MethodDefinition<RequestType, ResponseType> { export interface MethodDefinition<RequestType, ResponseType> {
path: string; path: string;
@ -25,7 +29,9 @@ export interface ServiceDefinition {
[index: string]: MethodDefinition<object, object>; [index: string]: MethodDefinition<object, object>;
} }
export interface PackageDefinition { [index: string]: ServiceDefinition; } export interface PackageDefinition {
[index: string]: ServiceDefinition;
}
function getDefaultValues<T>(metadata?: Metadata, options?: T): function getDefaultValues<T>(metadata?: Metadata, options?: T):
{metadata: Metadata; options: Partial<T>;} { {metadata: Metadata; options: Partial<T>;} {

View File

@ -3,7 +3,9 @@ import {forOwn} from 'lodash';
export type MetadataValue = string|Buffer; export type MetadataValue = string|Buffer;
export interface MetadataObject { [key: string]: MetadataValue[]; } export interface MetadataObject {
[key: string]: MetadataValue[];
}
function cloneMetadataObject(repr: MetadataObject): MetadataObject { function cloneMetadataObject(repr: MetadataObject): MetadataObject {
const result: MetadataObject = {}; const result: MetadataObject = {};

View File

@ -6,13 +6,13 @@ import * as stream from 'stream';
import {CallCredentials} from '../src/call-credentials'; import {CallCredentials} from '../src/call-credentials';
import {Http2CallStream} from '../src/call-stream'; import {Http2CallStream} from '../src/call-stream';
import {Channel} from '../src/channel';
import {CompressionFilterFactory} from '../src/compression-filter'; import {CompressionFilterFactory} from '../src/compression-filter';
import {Status} from '../src/constants'; import {Status} from '../src/constants';
import {FilterStackFactory} from '../src/filter-stack'; import {FilterStackFactory} from '../src/filter-stack';
import {Metadata} from '../src/metadata'; import {Metadata} from '../src/metadata';
import {assert2, mockFunction} from './common'; import {assert2, mockFunction} from './common';
import { Channel } from '../src/channel';
interface DataFrames { interface DataFrames {
payload: Buffer; payload: Buffer;
@ -89,7 +89,8 @@ describe('CallStream', () => {
* Currently the channel is unused, so we can replace it with an empty object, * Currently the channel is unused, so we can replace it with an empty object,
* but this might break if we start checking channel arguments, in which case * but this might break if we start checking channel arguments, in which case
* we will need a more sophisticated fake */ * we will need a more sophisticated fake */
const filterStackFactory = new FilterStackFactory([new CompressionFilterFactory(<Channel>{})]); const filterStackFactory =
new FilterStackFactory([new CompressionFilterFactory(<Channel>{})]);
const message = 'eat this message'; // 16 bytes const message = 'eat this message'; // 16 bytes
beforeEach(() => { beforeEach(() => {