Merge remote-tracking branch 'upstream/v1.11.x' into v1.11.x_merge

This commit is contained in:
murgatroid99 2018-05-03 14:30:11 -07:00
commit a6747e910b
36 changed files with 940 additions and 580 deletions

View File

@ -1,5 +1,7 @@
# Pure JavaScript gRPC Client
**Note: This is an alpha-level release. Some APIs may not yet be present and there may be bugs. Please report any that you encounter**
## Installation
Node 9.x or greater is required.

View File

@ -10,11 +10,10 @@ export class CallCredentialsFilter extends BaseFilter implements Filter {
private serviceUrl: string;
constructor(
private readonly credentials: CallCredentials,
private readonly host: string,
private readonly path: string) {
private readonly host: string, private readonly path: string) {
super();
let splitPath: string[] = path.split('/');
let serviceName: string = '';
const splitPath: string[] = path.split('/');
let serviceName = '';
/* The standard path format is "/{serviceName}/{methodName}", so if we split
* by '/', the first item should be empty and the second should be the
* service name */
@ -27,8 +26,9 @@ export class CallCredentialsFilter extends BaseFilter implements Filter {
}
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
let credsMetadata = this.credentials.generateMetadata({ service_url: this.serviceUrl });
let resultMetadata = await metadata;
const credsMetadata =
this.credentials.generateMetadata({service_url: this.serviceUrl});
const resultMetadata = await metadata;
resultMetadata.merge(await credsMetadata);
return resultMetadata;
}
@ -43,8 +43,7 @@ export class CallCredentialsFilterFactory implements
createFilter(callStream: CallStream): CallCredentialsFilter {
return new CallCredentialsFilter(
this.credentials.compose(callStream.getCredentials()),
callStream.getHost(),
callStream.getMethod());
this.credentials.compose(callStream.getCredentials()),
callStream.getHost(), callStream.getMethod());
}
}

View File

@ -2,39 +2,59 @@ import {map, reduce} from 'lodash';
import {Metadata} from './metadata';
export type CallMetadataOptions = { service_url: string; };
export type CallMetadataOptions = {
service_url: string;
};
export type CallMetadataGenerator =
(options: CallMetadataOptions, cb: (err: Error|null, metadata?: Metadata) => void) =>
void;
(options: CallMetadataOptions,
cb: (err: Error|null, metadata?: Metadata) => void) => void;
/**
* A class that represents a generic method of adding authentication-related
* metadata on a per-request basis.
*/
export interface CallCredentials {
export abstract class CallCredentials {
/**
* Asynchronously generates a new Metadata object.
* @param options Options used in generating the Metadata object.
*/
generateMetadata(options: CallMetadataOptions): Promise<Metadata>;
abstract generateMetadata(options: CallMetadataOptions): Promise<Metadata>;
/**
* Creates a new CallCredentials object from properties of both this and
* another CallCredentials object. This object's metadata generator will be
* called first.
* @param callCredentials The other CallCredentials object.
*/
compose(callCredentials: CallCredentials): CallCredentials;
abstract compose(callCredentials: CallCredentials): CallCredentials;
/**
* Creates a new CallCredentials object from a given function that generates
* Metadata objects.
* @param metadataGenerator A function that accepts a set of options, and
* generates a Metadata object based on these options, which is passed back
* to the caller via a supplied (err, metadata) callback.
*/
static createFromMetadataGenerator(metadataGenerator: CallMetadataGenerator):
CallCredentials {
return new SingleCallCredentials(metadataGenerator);
}
static createEmpty(): CallCredentials {
return new EmptyCallCredentials();
}
}
class ComposedCallCredentials implements CallCredentials {
constructor(private creds: CallCredentials[]) {}
class ComposedCallCredentials extends CallCredentials {
constructor(private creds: CallCredentials[]) {
super();
}
async generateMetadata(options: CallMetadataOptions): Promise<Metadata> {
let base: Metadata = new Metadata();
let generated: Metadata[] = await Promise.all(
const base: Metadata = new Metadata();
const generated: Metadata[] = await Promise.all(
map(this.creds, (cred) => cred.generateMetadata(options)));
for (let gen of generated) {
for (const gen of generated) {
base.merge(gen);
}
return base;
@ -45,8 +65,10 @@ class ComposedCallCredentials implements CallCredentials {
}
}
class SingleCallCredentials implements CallCredentials {
constructor(private metadataGenerator: CallMetadataGenerator) {}
class SingleCallCredentials extends CallCredentials {
constructor(private metadataGenerator: CallMetadataGenerator) {
super();
}
generateMetadata(options: CallMetadataOptions): Promise<Metadata> {
return new Promise<Metadata>((resolve, reject) => {
@ -65,7 +87,7 @@ class SingleCallCredentials implements CallCredentials {
}
}
class EmptyCallCredentials implements CallCredentials {
class EmptyCallCredentials extends CallCredentials {
generateMetadata(options: CallMetadataOptions): Promise<Metadata> {
return Promise.resolve(new Metadata());
}
@ -74,21 +96,3 @@ class EmptyCallCredentials implements CallCredentials {
return other;
}
}
export namespace CallCredentials {
/**
* Creates a new CallCredentials object from a given function that generates
* Metadata objects.
* @param metadataGenerator A function that accepts a set of options, and
* generates a Metadata object based on these options, which is passed back
* to the caller via a supplied (err, metadata) callback.
*/
export function createFromMetadataGenerator(
metadataGenerator: CallMetadataGenerator): CallCredentials {
return new SingleCallCredentials(metadataGenerator);
}
export function createEmpty(): CallCredentials {
return new EmptyCallCredentials();
}
}

View File

@ -9,9 +9,10 @@ import {FilterStackFactory} from './filter-stack';
import {Metadata} from './metadata';
import {ObjectDuplex} from './object-stream';
const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL} = http2.constants;
const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL} =
http2.constants;
export type Deadline = Date | number;
export type Deadline = Date|number;
export interface CallStreamOptions {
deadline: Deadline;
@ -36,20 +37,19 @@ export interface WriteObject {
/**
* This interface represents a duplex stream associated with a single gRPC call.
*/
export type CallStream = {
cancelWithStatus(status: Status, details: string): void;
getPeer(): string;
export type CallStream = {
cancelWithStatus(status: Status, details: string): void; getPeer(): string;
getDeadline(): Deadline;
getCredentials(): CallCredentials;
/* If the return value is null, the call has not ended yet. Otherwise, it has
* ended with the specified status */
getStatus(): StatusObject|null;
getStatus(): StatusObject | null;
getMethod(): string;
getHost(): string;
} & EmitterAugmentation1<'metadata', Metadata>
& EmitterAugmentation1<'status', StatusObject>
& ObjectDuplex<WriteObject, Buffer>;
}&EmitterAugmentation1<'metadata', Metadata>&
EmitterAugmentation1<'status', StatusObject>&
ObjectDuplex<WriteObject, Buffer>;
enum ReadState {
NO_DATA,
@ -60,7 +60,7 @@ enum ReadState {
const emptyBuffer = Buffer.alloc(0);
export class Http2CallStream extends Duplex implements CallStream {
public filterStack: Filter;
filterStack: Filter;
private statusEmitted = false;
private http2Stream: http2.ClientHttp2Stream|null = null;
private pendingRead = false;
@ -76,7 +76,7 @@ export class Http2CallStream extends Duplex implements CallStream {
private readPartialMessage: Buffer[] = [];
private readMessageRemaining = 0;
private unpushedReadMessages: (Buffer|null)[] = [];
private unpushedReadMessages: Array<Buffer|null> = [];
// Status code mapped from :status. To be used if grpc-status is not received
private mappedStatusCode: Status = Status.UNKNOWN;
@ -124,20 +124,21 @@ export class Http2CallStream extends Duplex implements CallStream {
}
private handleTrailers(headers: http2.IncomingHttpHeaders) {
let code: Status = this.mappedStatusCode;
let details = '';
const code: Status = this.mappedStatusCode;
const details = '';
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
metadata = new Metadata();
}
let status: StatusObject = {code, details, metadata};
const status: StatusObject = {code, details, metadata};
this.handlingTrailers = (async () => {
let finalStatus;
try {
// Attempt to assign final status.
finalStatus = await this.filterStack.receiveTrailers(Promise.resolve(status));
finalStatus =
await this.filterStack.receiveTrailers(Promise.resolve(status));
} catch (error) {
await this.handlingHeaders;
// This is a no-op if the call was already ended when handling headers.
@ -195,17 +196,26 @@ export class Http2CallStream extends Duplex implements CallStream {
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (error) {
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
this.endCall({
code: Status.UNKNOWN,
details: error.message,
metadata: new Metadata()
});
return;
}
this.handlingHeaders =
this.filterStack.receiveMetadata(Promise.resolve(metadata))
.then((finalMetadata) => {
this.emit('metadata', finalMetadata);
}).catch((error) => {
this.destroyHttp2Stream();
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
});
this.filterStack.receiveMetadata(Promise.resolve(metadata))
.then((finalMetadata) => {
this.emit('metadata', finalMetadata);
})
.catch((error) => {
this.destroyHttp2Stream();
this.endCall({
code: Status.UNKNOWN,
details: error.message,
metadata: new Metadata()
});
});
}
});
stream.on('trailers', this.handleTrailers.bind(this));
@ -260,6 +270,9 @@ export class Http2CallStream extends Duplex implements CallStream {
canPush = this.tryPush(messageBytes, canPush);
this.readState = ReadState.NO_DATA;
}
break;
default:
throw new Error('This should never happen');
}
}
});
@ -298,7 +311,7 @@ export class Http2CallStream extends Duplex implements CallStream {
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({code: code, details: details, metadata: new Metadata()});
this.endCall({code, details, metadata: new Metadata()});
});
stream.on('error', (err: Error) => {
this.endCall({
@ -338,7 +351,7 @@ export class Http2CallStream extends Duplex implements CallStream {
// If trailers are currently being processed, the call should be ended
// by handleTrailers instead.
await this.handlingTrailers;
this.endCall({code: status, details: details, metadata: new Metadata()});
this.endCall({code: status, details, metadata: new Metadata()});
})();
}

View File

@ -1,27 +1,25 @@
import {EventEmitter} from 'events';
import {EmitterAugmentation1} from './events';
import * as _ from 'lodash';
import {Duplex, Readable, Writable} from 'stream';
import {CallStream, StatusObject, WriteObject} from './call-stream';
import {Status} from './constants';
import {EmitterAugmentation1} from './events';
import {Metadata} from './metadata';
import {ObjectReadable, ObjectWritable} from './object-stream';
import * as _ from 'lodash';
/**
* A type extending the built-in Error object with additional fields.
*/
export type ServiceError = StatusObject & Error;
export type ServiceError = StatusObject&Error;
/**
* A base type for all user-facing values returned by client-side method calls.
*/
export type Call = {
cancel(): void;
getPeer(): string;
} & EmitterAugmentation1<'metadata', Metadata>
& EmitterAugmentation1<'status', StatusObject>
& EventEmitter;
cancel(): void; getPeer(): string;
}&EmitterAugmentation1<'metadata', Metadata>&
EmitterAugmentation1<'status', StatusObject>&EventEmitter;
/**
* A type representing the return value of a unary method call.
@ -33,22 +31,23 @@ export type ClientUnaryCall = Call;
*/
export type ClientReadableStream<ResponseType> = {
deserialize: (chunk: Buffer) => ResponseType;
} & Call & ObjectReadable<ResponseType>;
}&Call&ObjectReadable<ResponseType>;
/**
* A type representing the return value of a client stream method call.
*/
export type ClientWritableStream<RequestType> = {
serialize: (value: RequestType) => Buffer;
} & Call & ObjectWritable<RequestType>;
}&Call&ObjectWritable<RequestType>;
/**
* A type representing the return value of a bidirectional stream method call.
*/
export type ClientDuplexStream<RequestType, ResponseType> =
ClientWritableStream<RequestType> & ClientReadableStream<ResponseType>;
ClientWritableStream<RequestType>&ClientReadableStream<ResponseType>;
export class ClientUnaryCallImpl extends EventEmitter implements ClientUnaryCall {
export class ClientUnaryCallImpl extends EventEmitter implements
ClientUnaryCall {
constructor(private readonly call: CallStream) {
super();
call.on('metadata', (metadata: Metadata) => {
@ -89,8 +88,9 @@ function setUpReadableStream<ResponseType>(
call.on('status', (status: StatusObject) => {
if (status.code !== Status.OK) {
const statusName = _.invert(Status)[status.code];
const message: string = `${status.code} ${statusName}: ${status.details}`;
const error: ServiceError = Object.assign(new Error(status.details), status);
const message = `${status.code} ${statusName}: ${status.details}`;
const error: ServiceError =
Object.assign(new Error(status.details), status);
stream.emit('error', error);
}
stream.emit('status', status);
@ -102,7 +102,7 @@ export class ClientReadableStreamImpl<ResponseType> extends Readable implements
ClientReadableStream<ResponseType> {
constructor(
private readonly call: CallStream,
public readonly deserialize: (chunk: Buffer) => ResponseType) {
readonly deserialize: (chunk: Buffer) => ResponseType) {
super({objectMode: true});
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
@ -135,7 +135,7 @@ function tryWrite<RequestType>(
cb(e);
return;
}
const writeObj: WriteObject = {message: message};
const writeObj: WriteObject = {message};
if (!Number.isNaN(flags)) {
writeObj.flags = flags;
}
@ -146,7 +146,7 @@ export class ClientWritableStreamImpl<RequestType> extends Writable implements
ClientWritableStream<RequestType> {
constructor(
private readonly call: CallStream,
public readonly serialize: (value: RequestType) => Buffer) {
readonly serialize: (value: RequestType) => Buffer) {
super({objectMode: true});
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);
@ -178,8 +178,8 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
implements ClientDuplexStream<RequestType, ResponseType> {
constructor(
private readonly call: CallStream,
public readonly serialize: (value: RequestType) => Buffer,
public readonly deserialize: (chunk: Buffer) => ResponseType) {
readonly serialize: (value: RequestType) => Buffer,
readonly deserialize: (chunk: Buffer) => ResponseType) {
super({objectMode: true});
call.on('metadata', (metadata: Metadata) => {
this.emit('metadata', metadata);

View File

@ -2,90 +2,45 @@ import {createSecureContext, SecureContext} from 'tls';
import {CallCredentials} from './call-credentials';
/**
* A class that contains credentials for communicating over a channel, as well
* as a set of per-call credentials, which are applied to every method call made
* over a channel initialized with an instance of this class.
*/
export interface ChannelCredentials {
/**
* Returns a copy of this object with the included set of per-call credentials
* expanded to include callCredentials.
* @param callCredentials A CallCredentials object to associate with this
* instance.
*/
compose(callCredentials: CallCredentials): ChannelCredentials;
/**
* Gets the set of per-call credentials associated with this instance.
*/
getCallCredentials(): CallCredentials;
/**
* Gets a SecureContext object generated from input parameters if this
* instance was created with createSsl, or null if this instance was created
* with createInsecure.
*/
getSecureContext(): SecureContext|null;
}
abstract class ChannelCredentialsImpl implements ChannelCredentials {
protected callCredentials: CallCredentials;
protected constructor(callCredentials?: CallCredentials) {
this.callCredentials = callCredentials || CallCredentials.createEmpty();
}
abstract compose(callCredentials: CallCredentials): ChannelCredentialsImpl;
getCallCredentials(): CallCredentials {
return this.callCredentials;
}
abstract getSecureContext(): SecureContext|null;
}
class InsecureChannelCredentialsImpl extends ChannelCredentialsImpl {
constructor(callCredentials?: CallCredentials) {
super(callCredentials);
}
compose(callCredentials: CallCredentials): ChannelCredentialsImpl {
throw new Error('Cannot compose insecure credentials');
}
getSecureContext(): SecureContext|null {
return null;
}
}
class SecureChannelCredentialsImpl extends ChannelCredentialsImpl {
secureContext: SecureContext;
constructor(secureContext: SecureContext, callCredentials?: CallCredentials) {
super(callCredentials);
this.secureContext = secureContext;
}
compose(callCredentials: CallCredentials): ChannelCredentialsImpl {
const combinedCallCredentials =
this.callCredentials.compose(callCredentials);
return new SecureChannelCredentialsImpl(
this.secureContext, combinedCallCredentials);
}
getSecureContext(): SecureContext|null {
return this.secureContext;
}
}
// tslint:disable-next-line:no-any
function verifyIsBufferOrNull(obj: any, friendlyName: string): void {
if (obj && !(obj instanceof Buffer)) {
throw new TypeError(`${friendlyName}, if provided, must be a Buffer.`);
}
}
export namespace ChannelCredentials {
/**
* A class that contains credentials for communicating over a channel, as well
* as a set of per-call credentials, which are applied to every method call made
* over a channel initialized with an instance of this class.
*/
export abstract class ChannelCredentials {
protected callCredentials: CallCredentials;
protected constructor(callCredentials?: CallCredentials) {
this.callCredentials = callCredentials || CallCredentials.createEmpty();
}
/**
* Returns a copy of this object with the included set of per-call credentials
* expanded to include callCredentials.
* @param callCredentials A CallCredentials object to associate with this
* instance.
*/
abstract compose(callCredentials: CallCredentials): ChannelCredentials;
/**
* Gets the set of per-call credentials associated with this instance.
*/
getCallCredentials(): CallCredentials {
return this.callCredentials;
}
/**
* Gets a SecureContext object generated from input parameters if this
* instance was created with createSsl, or null if this instance was created
* with createInsecure.
*/
abstract getSecureContext(): SecureContext|null;
/**
* Return a new ChannelCredentials instance with a given set of credentials.
@ -95,7 +50,7 @@ export namespace ChannelCredentials {
* @param privateKey The client certificate private key, if available.
* @param certChain The client certificate key chain, if available.
*/
export function createSsl(
static createSsl(
rootCerts?: Buffer|null, privateKey?: Buffer|null,
certChain?: Buffer|null): ChannelCredentials {
verifyIsBufferOrNull(rootCerts, 'Root certificate');
@ -120,7 +75,41 @@ export namespace ChannelCredentials {
/**
* Return a new ChannelCredentials instance with no credentials.
*/
export function createInsecure(): ChannelCredentials {
static createInsecure(): ChannelCredentials {
return new InsecureChannelCredentialsImpl();
}
}
class InsecureChannelCredentialsImpl extends ChannelCredentials {
constructor(callCredentials?: CallCredentials) {
super(callCredentials);
}
compose(callCredentials: CallCredentials): ChannelCredentials {
throw new Error('Cannot compose insecure credentials');
}
getSecureContext(): SecureContext|null {
return null;
}
}
class SecureChannelCredentialsImpl extends ChannelCredentials {
secureContext: SecureContext;
constructor(secureContext: SecureContext, callCredentials?: CallCredentials) {
super(callCredentials);
this.secureContext = secureContext;
}
compose(callCredentials: CallCredentials): ChannelCredentials {
const combinedCallCredentials =
this.callCredentials.compose(callCredentials);
return new SecureChannelCredentialsImpl(
this.secureContext, combinedCallCredentials);
}
getSecureContext(): SecureContext|null {
return this.secureContext;
}
}

View File

@ -1,6 +1,6 @@
import {EventEmitter} from 'events';
import * as http2 from 'http2';
import {checkServerIdentity, SecureContext, PeerCertificate} from 'tls';
import {checkServerIdentity, PeerCertificate, SecureContext} from 'tls';
import * as url from 'url';
import {CallCredentials} from './call-credentials';
@ -12,9 +12,9 @@ import {Status} from './constants';
import {DeadlineFilterFactory} from './deadline-filter';
import {FilterStackFactory} from './filter-stack';
import {Metadata, MetadataObject} from './metadata';
import { MetadataStatusFilterFactory } from './metadata-status-filter';
import {MetadataStatusFilterFactory} from './metadata-status-filter';
const { version: clientVersion } = require('../../package');
const {version: clientVersion} = require('../../package');
const IDLE_TIMEOUT_MS = 300000;
@ -42,7 +42,7 @@ export interface ChannelOptions {
'grpc.primary_user_agent': string;
'grpc.secondary_user_agent': string;
'grpc.default_authority': string;
[key: string]: string | number;
[key: string]: string|number;
}
export enum ConnectivityState {
@ -53,7 +53,7 @@ export enum ConnectivityState {
SHUTDOWN
}
function uniformRandom(min:number, max: number) {
function uniformRandom(min: number, max: number) {
return Math.random() * (max - min) + min;
}
@ -71,6 +71,7 @@ export interface Channel extends EventEmitter {
getConnectivityState(): ConnectivityState;
close(): void;
/* tslint:disable:no-any */
addListener(event: string, listener: Function): this;
emit(event: string|symbol, ...args: any[]): boolean;
on(event: string, listener: Function): this;
@ -78,6 +79,7 @@ export interface Channel extends EventEmitter {
prependListener(event: string, listener: Function): this;
prependOnceListener(event: string, listener: Function): this;
removeListener(event: string, listener: Function): this;
/* tslint:enable:no-any */
}
export class Http2Channel extends EventEmitter implements Channel {
@ -92,54 +94,65 @@ export class Http2Channel extends EventEmitter implements Channel {
private subChannel: http2.ClientHttp2Session|null = null;
private filterStackFactory: FilterStackFactory;
private subChannelConnectCallback: ()=>void = () => {};
private subChannelCloseCallback: ()=>void = () => {};
private subChannelConnectCallback: () => void = () => {};
private subChannelCloseCallback: () => void = () => {};
private backoffTimerId: NodeJS.Timer;
private currentBackoff: number = INITIAL_BACKOFF_MS;
private currentBackoffDeadline: Date;
private handleStateChange(oldState: ConnectivityState, newState: ConnectivityState): void {
let now: Date = new Date();
switch(newState) {
case ConnectivityState.CONNECTING:
if (oldState === ConnectivityState.IDLE) {
this.currentBackoff = INITIAL_BACKOFF_MS;
this.currentBackoffDeadline = new Date(now.getTime() + INITIAL_BACKOFF_MS);
} else if (oldState === ConnectivityState.TRANSIENT_FAILURE) {
this.currentBackoff = Math.min(this.currentBackoff * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS);
let jitterMagnitude: number = BACKOFF_JITTER * this.currentBackoff;
this.currentBackoffDeadline = new Date(now.getTime() + this.currentBackoff + uniformRandom(-jitterMagnitude, jitterMagnitude));
}
this.startConnecting();
break;
case ConnectivityState.READY:
this.emit('connect');
break;
case ConnectivityState.TRANSIENT_FAILURE:
this.subChannel = null;
this.backoffTimerId = setTimeout(() => {
this.transitionToState([ConnectivityState.TRANSIENT_FAILURE], ConnectivityState.CONNECTING);
}, this.currentBackoffDeadline.getTime() - now.getTime());
break;
case ConnectivityState.IDLE:
case ConnectivityState.SHUTDOWN:
if (this.subChannel) {
this.subChannel.close();
this.subChannel.removeListener('connect', this.subChannelConnectCallback);
this.subChannel.removeListener('close', this.subChannelCloseCallback);
private handleStateChange(
oldState: ConnectivityState, newState: ConnectivityState): void {
const now: Date = new Date();
switch (newState) {
case ConnectivityState.CONNECTING:
if (oldState === ConnectivityState.IDLE) {
this.currentBackoff = INITIAL_BACKOFF_MS;
this.currentBackoffDeadline =
new Date(now.getTime() + INITIAL_BACKOFF_MS);
} else if (oldState === ConnectivityState.TRANSIENT_FAILURE) {
this.currentBackoff = Math.min(
this.currentBackoff * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS);
const jitterMagnitude: number = BACKOFF_JITTER * this.currentBackoff;
this.currentBackoffDeadline = new Date(
now.getTime() + this.currentBackoff +
uniformRandom(-jitterMagnitude, jitterMagnitude));
}
this.startConnecting();
break;
case ConnectivityState.READY:
this.emit('connect');
break;
case ConnectivityState.TRANSIENT_FAILURE:
this.subChannel = null;
this.emit('shutdown');
clearTimeout(this.backoffTimerId);
}
break;
this.backoffTimerId = setTimeout(() => {
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE],
ConnectivityState.CONNECTING);
}, this.currentBackoffDeadline.getTime() - now.getTime());
break;
case ConnectivityState.IDLE:
case ConnectivityState.SHUTDOWN:
if (this.subChannel) {
this.subChannel.close();
this.subChannel.removeListener(
'connect', this.subChannelConnectCallback);
this.subChannel.removeListener('close', this.subChannelCloseCallback);
this.subChannel = null;
this.emit('shutdown');
clearTimeout(this.backoffTimerId);
}
break;
default:
throw new Error('This should never happen');
}
}
// Transition from any of a set of oldStates to a specific newState
private transitionToState(oldStates: ConnectivityState[], newState: ConnectivityState): void {
private transitionToState(
oldStates: ConnectivityState[], newState: ConnectivityState): void {
if (oldStates.indexOf(this.connectivityState) > -1) {
let oldState: ConnectivityState = this.connectivityState;
const oldState: ConnectivityState = this.connectivityState;
this.connectivityState = newState;
this.handleStateChange(oldState, newState);
this.emit('connectivityStateChanged', newState);
@ -148,55 +161,59 @@ export class Http2Channel extends EventEmitter implements Channel {
private startConnecting(): void {
let subChannel: http2.ClientHttp2Session;
let secureContext = this.credentials.getSecureContext();
const secureContext = this.credentials.getSecureContext();
if (secureContext === null) {
subChannel = http2.connect(this.target);
} else {
const connectionOptions: http2.SecureClientSessionOptions = {
secureContext,
}
};
// If provided, the value of grpc.ssl_target_name_override should be used
// to override the target hostname when checking server identity.
// This option is used for testing only.
if (this.options['grpc.ssl_target_name_override']) {
const sslTargetNameOverride = this.options['grpc.ssl_target_name_override']!;
connectionOptions.checkServerIdentity = (host: string, cert: PeerCertificate): Error | undefined => {
return checkServerIdentity(sslTargetNameOverride, cert);
}
const sslTargetNameOverride =
this.options['grpc.ssl_target_name_override']!;
connectionOptions.checkServerIdentity =
(host: string, cert: PeerCertificate): Error|undefined => {
return checkServerIdentity(sslTargetNameOverride, cert);
};
connectionOptions.servername = sslTargetNameOverride;
}
subChannel = http2.connect(this.target, connectionOptions);
}
this.subChannel = subChannel;
let now = new Date();
let connectionTimeout: number = Math.max(
this.currentBackoffDeadline.getTime() - now.getTime(),
MIN_CONNECT_TIMEOUT_MS);
let connectionTimerId: NodeJS.Timer = setTimeout(() => {
// This should trigger the 'close' event, which will send us back to TRANSIENT_FAILURE
const now = new Date();
const connectionTimeout: number = Math.max(
this.currentBackoffDeadline.getTime() - now.getTime(),
MIN_CONNECT_TIMEOUT_MS);
const connectionTimerId: NodeJS.Timer = setTimeout(() => {
// This should trigger the 'close' event, which will send us back to
// TRANSIENT_FAILURE
subChannel.close();
}, connectionTimeout);
this.subChannelConnectCallback = () => {
// Connection succeeded
clearTimeout(connectionTimerId);
this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY);
this.transitionToState(
[ConnectivityState.CONNECTING], ConnectivityState.READY);
};
subChannel.once('connect', this.subChannelConnectCallback);
this.subChannelCloseCallback = () => {
// Connection failed
clearTimeout(connectionTimerId);
/* TODO(murgatroid99): verify that this works for CONNECTING->TRANSITIVE_FAILURE
* see nodejs/node#16645 */
this.transitionToState([ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
/* TODO(murgatroid99): verify that this works for
* CONNECTING->TRANSITIVE_FAILURE see nodejs/node#16645 */
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
};
subChannel.once('close', this.subChannelCloseCallback);
subChannel.once('error', this.subChannelCloseCallback);
}
constructor(
address: string,
public readonly credentials: ChannelCredentials,
address: string, readonly credentials: ChannelCredentials,
private readonly options: Partial<ChannelOptions>) {
super();
if (credentials.getSecureContext() === null) {
@ -212,8 +229,7 @@ export class Http2Channel extends EventEmitter implements Channel {
}
this.filterStackFactory = new FilterStackFactory([
new CompressionFilterFactory(this),
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this),
new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this),
new MetadataStatusFilterFactory(this)
]);
this.currentBackoffDeadline = new Date();
@ -224,60 +240,60 @@ export class Http2Channel extends EventEmitter implements Channel {
// Build user-agent string.
this.userAgent = [
options['grpc.primary_user_agent'],
`grpc-node-js/${clientVersion}`,
options['grpc.primary_user_agent'], `grpc-node-js/${clientVersion}`,
options['grpc.secondary_user_agent']
].filter(e => e).join(' '); // remove falsey values first
].filter(e => e).join(' '); // remove falsey values first
}
private startHttp2Stream(
authority: string,
methodName: string,
stream: Http2CallStream,
authority: string, methodName: string, stream: Http2CallStream,
metadata: Metadata) {
let finalMetadata: Promise<Metadata> =
const finalMetadata: Promise<Metadata> =
stream.filterStack.sendMetadata(Promise.resolve(metadata.clone()));
Promise.all([finalMetadata, this.connect()])
.then(([metadataValue]) => {
let headers = metadataValue.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = authority;
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = methodName;
headers[HTTP2_HEADER_TE] = 'trailers';
if (this.connectivityState === ConnectivityState.READY) {
const session: http2.ClientHttp2Session = this.subChannel!;
// Prevent the HTTP/2 session from keeping the process alive.
// Note: this function is only available in Node 9
session.unref();
stream.attachHttp2Stream(session.request(headers));
} else {
/* In this case, we lost the connection while finalizing
* metadata. That should be very unusual */
setImmediate(() => {
this.startHttp2Stream(authority, methodName, stream, metadata);
});
}
}).catch((error: Error & { code: number }) => {
// We assume the error code isn't 0 (Status.OK)
stream.cancelWithStatus(error.code || Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${error.message}`);
});
.then(([metadataValue]) => {
const headers = metadataValue.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = authority;
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = methodName;
headers[HTTP2_HEADER_TE] = 'trailers';
if (this.connectivityState === ConnectivityState.READY) {
const session: http2.ClientHttp2Session = this.subChannel!;
// Prevent the HTTP/2 session from keeping the process alive.
// Note: this function is only available in Node 9
session.unref();
stream.attachHttp2Stream(session.request(headers));
} else {
/* In this case, we lost the connection while finalizing
* metadata. That should be very unusual */
setImmediate(() => {
this.startHttp2Stream(authority, methodName, stream, metadata);
});
}
})
.catch((error: Error&{code: number}) => {
// We assume the error code isn't 0 (Status.OK)
stream.cancelWithStatus(
error.code || Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${
error.message}`);
});
}
createStream(methodName: string, metadata: Metadata, options: CallOptions):
CallStream {
CallStream {
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down');
}
let finalOptions: CallStreamOptions = {
const finalOptions: CallStreamOptions = {
deadline: options.deadline === undefined ? Infinity : options.deadline,
credentials: options.credentials || CallCredentials.createEmpty(),
flags: options.flags || 0,
host: options.host || this.defaultAuthority
};
let stream: Http2CallStream =
const stream: Http2CallStream =
new Http2CallStream(methodName, finalOptions, this.filterStackFactory);
this.startHttp2Stream(finalOptions.host, methodName, stream, metadata);
return stream;
@ -298,7 +314,8 @@ export class Http2Channel extends EventEmitter implements Channel {
// been (connectivityState === IDLE).
if (!this.connecting) {
this.connecting = new Promise((resolve, reject) => {
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING);
this.transitionToState(
[ConnectivityState.IDLE], ConnectivityState.CONNECTING);
const onConnect = () => {
this.connecting = null;
this.removeListener('shutdown', onShutdown);
@ -325,9 +342,11 @@ export class Http2Channel extends EventEmitter implements Channel {
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down');
}
this.transitionToState([ConnectivityState.CONNECTING,
ConnectivityState.READY,
ConnectivityState.TRANSIENT_FAILURE,
ConnectivityState.IDLE], ConnectivityState.SHUTDOWN);
this.transitionToState(
[
ConnectivityState.CONNECTING, ConnectivityState.READY,
ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.IDLE
],
ConnectivityState.SHUTDOWN);
}
}

View File

@ -21,7 +21,7 @@ export interface UnaryCallback<ResponseType> {
* clients.
*/
export class Client {
private readonly [kChannel]: Channel;
private readonly[kChannel]: Channel;
constructor(
address: string, credentials: ChannelCredentials,
options: Partial<ChannelOptions> = {}) {
@ -34,24 +34,26 @@ export class Client {
waitForReady(deadline: Date|number, callback: (error: Error|null) => void):
void {
let cb: (error: Error|null) => void = once(callback);
let callbackCalled = false;
let timer: NodeJS.Timer | null = null;
this[kChannel].connect().then(() => {
if (timer) {
clearTimeout(timer);
}
cb(null);
}, (err: Error) => {
// Rejection occurs if channel is shut down first.
if (timer) {
clearTimeout(timer);
}
cb(err);
});
const cb: (error: Error|null) => void = once(callback);
const callbackCalled = false;
let timer: NodeJS.Timer|null = null;
this[kChannel].connect().then(
() => {
if (timer) {
clearTimeout(timer);
}
cb(null);
},
(err: Error) => {
// Rejection occurs if channel is shut down first.
if (timer) {
clearTimeout(timer);
}
cb(err);
});
if (deadline !== Infinity) {
let timeout: number;
let now: number = (new Date).getTime();
const now: number = (new Date()).getTime();
if (deadline instanceof Date) {
timeout = deadline.getTime() - now;
} else {
@ -94,7 +96,8 @@ export class Client {
if (status.code === Status.OK) {
callback(null, responseMessage as ResponseType);
} else {
const error: ServiceError = Object.assign(new Error(status.details), status);
const error: ServiceError =
Object.assign(new Error(status.details), status);
callback(error);
}
});
@ -156,7 +159,7 @@ export class Client {
const call: CallStream =
this[kChannel].createStream(method, metadata, options);
const message: Buffer = serialize(argument);
const writeObj: WriteObject = {message: message};
const writeObj: WriteObject = {message};
writeObj.flags = options.flags;
call.write(writeObj);
call.end();
@ -238,7 +241,7 @@ export class Client {
const call: CallStream =
this[kChannel].createStream(method, metadata, options);
const message: Buffer = serialize(argument);
const writeObj: WriteObject = {message: message};
const writeObj: WriteObject = {message};
writeObj.flags = options.flags;
call.write(writeObj);
call.end();

View File

@ -4,14 +4,14 @@ import {Status} from './constants';
import {BaseFilter, Filter, FilterFactory} from './filter';
import {Metadata} from './metadata';
const units: [string, number][] =
const units: Array<[string, number]> =
[['m', 1], ['S', 1000], ['M', 60 * 1000], ['H', 60 * 60 * 1000]];
function getDeadline(deadline: number) {
let now = (new Date()).getTime();
let timeoutMs = Math.max(deadline - now, 0);
for (let [unit, factor] of units) {
let amount = timeoutMs / factor;
const now = (new Date()).getTime();
const timeoutMs = Math.max(deadline - now, 0);
for (const [unit, factor] of units) {
const amount = timeoutMs / factor;
if (amount < 1e8) {
return String(Math.ceil(amount)) + unit;
}
@ -20,19 +20,19 @@ function getDeadline(deadline: number) {
}
export class DeadlineFilter extends BaseFilter implements Filter {
private timer: NodeJS.Timer | null = null;
private timer: NodeJS.Timer|null = null;
private deadline: number;
constructor(
private readonly channel: Http2Channel,
private readonly callStream: CallStream) {
super();
let callDeadline = callStream.getDeadline();
const callDeadline = callStream.getDeadline();
if (callDeadline instanceof Date) {
this.deadline = callDeadline.getTime();
} else {
this.deadline = callDeadline;
}
let now: number = (new Date()).getTime();
const now: number = (new Date()).getTime();
let timeout = this.deadline - now;
if (timeout < 0) {
timeout = 0;

View File

@ -23,7 +23,9 @@ export interface EmitterAugmentation2<Name extends string|symbol, Arg1, Arg2> {
emit(event: Name, arg1: Arg1, arg2: Arg2): boolean;
on(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this;
once(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this;
prependListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this;
prependOnceListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this;
prependListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void):
this;
prependOnceListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void):
this;
removeListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this;
}

View File

@ -25,7 +25,7 @@ export class FilterStack implements Filter {
}
export class FilterStackFactory implements FilterFactory<FilterStack> {
constructor(private readonly factories: FilterFactory<any>[]) {}
constructor(private readonly factories: Array<FilterFactory<Filter>>) {}
createFilter(callStream: CallStream): FilterStack {
return new FilterStack(

View File

@ -1,68 +1,78 @@
import { CallCredentials } from './call-credentials';
import { ChannelCredentials } from './channel-credentials';
import { Client } from './client';
import { Status} from './constants';
import { makeClientConstructor, loadPackageDefinition } from './make-client';
import { Metadata } from './metadata';
import { IncomingHttpHeaders } from 'http';
import {IncomingHttpHeaders} from 'http';
import {CallCredentials} from './call-credentials';
import {ChannelCredentials} from './channel-credentials';
import {Client} from './client';
import {Status} from './constants';
import {loadPackageDefinition, makeClientConstructor} from './make-client';
import {Metadata} from './metadata';
export interface OAuth2Client {
getRequestMetadata: (url: string, callback: (err: Error|null, headers?: { Authorization: string }) => void) => void;
getRequestMetadata: (url: string, callback: (err: Error|null, headers?: {
Authorization: string
}) => void) => void;
}
/**** Client Credentials ****/
// Using assign only copies enumerable properties, which is what we want
export const credentials = Object.assign({
/**
* Create a gRPC credential from a Google credential object.
* @param googleCredentials The authentication client to use.
* @return The resulting CallCredentials object.
*/
createFromGoogleCredential: (googleCredentials: OAuth2Client): CallCredentials => {
return CallCredentials.createFromMetadataGenerator((options, callback) => {
googleCredentials.getRequestMetadata(options.service_url, (err, headers) => {
if (err) {
callback(err);
return;
}
const metadata = new Metadata();
metadata.add('authorization', headers!.Authorization);
callback(null, metadata);
});
});
},
export const credentials = Object.assign(
{
/**
* Create a gRPC credential from a Google credential object.
* @param googleCredentials The authentication client to use.
* @return The resulting CallCredentials object.
*/
createFromGoogleCredential: (googleCredentials: OAuth2Client):
CallCredentials => {
return CallCredentials.createFromMetadataGenerator(
(options, callback) => {
googleCredentials.getRequestMetadata(
options.service_url, (err, headers) => {
if (err) {
callback(err);
return;
}
const metadata = new Metadata();
metadata.add('authorization', headers!.Authorization);
callback(null, metadata);
});
});
},
/**
* Combine a ChannelCredentials with any number of CallCredentials into a
* single ChannelCredentials object.
* @param channelCredentials The ChannelCredentials object.
* @param callCredentials Any number of CallCredentials objects.
* @return The resulting ChannelCredentials object.
*/
combineChannelCredentials: (
channelCredentials: ChannelCredentials,
...callCredentials: CallCredentials[]): ChannelCredentials => {
return callCredentials.reduce((acc, other) => acc.compose(other), channelCredentials);
},
/**
* Combine a ChannelCredentials with any number of CallCredentials into a
* single ChannelCredentials object.
* @param channelCredentials The ChannelCredentials object.
* @param callCredentials Any number of CallCredentials objects.
* @return The resulting ChannelCredentials object.
*/
combineChannelCredentials:
(channelCredentials: ChannelCredentials,
...callCredentials: CallCredentials[]): ChannelCredentials => {
return callCredentials.reduce(
(acc, other) => acc.compose(other), channelCredentials);
},
/**
* Combine any number of CallCredentials into a single CallCredentials object.
* @param first The first CallCredentials object.
* @param additional Any number of additional CallCredentials objects.
* @return The resulting CallCredentials object.
*/
combineCallCredentials: (
first: CallCredentials,
...additional: CallCredentials[]): CallCredentials => {
return additional.reduce((acc, other) => acc.compose(other), first);
}
}, ChannelCredentials, CallCredentials);
/**
* Combine any number of CallCredentials into a single CallCredentials
* object.
* @param first The first CallCredentials object.
* @param additional Any number of additional CallCredentials objects.
* @return The resulting CallCredentials object.
*/
combineCallCredentials: (
first: CallCredentials, ...additional: CallCredentials[]):
CallCredentials => {
return additional.reduce((acc, other) => acc.compose(other), first);
}
},
ChannelCredentials, CallCredentials);
/**** Metadata ****/
export { Metadata };
export {Metadata};
/**** Constants ****/
@ -90,3 +100,59 @@ export const waitForClientReady =
(client: Client, deadline: Date|number,
callback: (error: Error|null) => void) =>
client.waitForReady(deadline, callback);
/**** Unimplemented function stubs ****/
/* tslint:disable:no-any variable-name */
export const loadObject = (value: any, options: any) => {
throw new Error(
'Not available in this library. Use @grpc/proto-loader and loadPackageDefinition instead');
};
export const load = (filename: any, format: any, options: any) => {
throw new Error(
'Not available in this library. Use @grpc/proto-loader and loadPackageDefinition instead');
};
export const setLogger = (logger: any) => {
throw new Error('Not yet implemented');
};
export const setLogVerbosity = (verbosity: any) => {
throw new Error('Not yet implemented');
};
export const Server = (options: any) => {
throw new Error('Not yet implemented');
};
export const ServerCredentials = {
createSsl:
(rootCerts: any, keyCertPairs: any, checkClientCertificate: any) => {
throw new Error('Not yet implemented');
},
createInsecure: () => {
throw new Error('Not yet implemented');
}
};
export const getClientChannel = (client: any) => {
throw new Error('Not available in this library');
};
export const StatusBuilder = () => {
throw new Error('Not yet implemented');
};
export const ListenerBuilder = () => {
throw new Error('Not yet implemented');
};
export const InterceptorBuilder = () => {
throw new Error('Not yet implemented');
};
export const InterceptingCall = () => {
throw new Error('Not yet implemented');
};

View File

@ -1,17 +1,14 @@
import { Metadata } from "./metadata";
import { Client, UnaryCallback } from "./client";
import { CallOptions } from "./call-stream";
import * as _ from 'lodash';
import { ChannelCredentials } from "./channel-credentials";
import { ChannelOptions } from "./channel";
export interface Serialize<T> {
(value: T): Buffer;
}
import {CallOptions} from './call-stream';
import {ChannelOptions} from './channel';
import {ChannelCredentials} from './channel-credentials';
import {Client, UnaryCallback} from './client';
import {Metadata} from './metadata';
export interface Deserialize<T> {
(bytes: Buffer): T;
}
export interface Serialize<T> { (value: T): Buffer; }
export interface Deserialize<T> { (bytes: Buffer): T; }
export interface MethodDefinition<RequestType, ResponseType> {
path: string;
@ -28,18 +25,11 @@ export interface ServiceDefinition {
[index: string]: MethodDefinition<object, object>;
}
export interface PackageDefinition {
[index: string]: ServiceDefinition;
}
export interface PackageDefinition { [index: string]: ServiceDefinition; }
function getDefaultValues<T>(metadata?: Metadata, options?: T): {
metadata: Metadata;
options: Partial<T>;
} {
return {
metadata: metadata || new Metadata(),
options: options || {}
};
function getDefaultValues<T>(metadata?: Metadata, options?: T):
{metadata: Metadata; options: Partial<T>;} {
return {metadata: metadata || new Metadata(), options: options || {}};
}
/**
@ -60,9 +50,9 @@ export interface ServiceClient extends Client {
export interface ServiceClientConstructor {
new(address: string, credentials: ChannelCredentials,
options?: Partial<ChannelOptions>): ServiceClient;
options?: Partial<ChannelOptions>): ServiceClient;
service: ServiceDefinition;
};
}
/**
* Creates a constructor for a client with the given methods, as specified in
@ -111,23 +101,24 @@ export function makeClientConstructor(
}
const serialize = attrs.requestSerialize;
const deserialize = attrs.responseDeserialize;
const methodFunc = _.partial(requesterFuncs[methodType], attrs.path,
serialize, deserialize);
const methodFunc = _.partial(
requesterFuncs[methodType], attrs.path, serialize, deserialize);
ServiceClientImpl.prototype[name] = methodFunc;
// Associate all provided attributes with the method
_.assign(ServiceClientImpl.prototype[name], attrs);
if (attrs.originalName) {
ServiceClientImpl.prototype[attrs.originalName] = ServiceClientImpl.prototype[name];
ServiceClientImpl.prototype[attrs.originalName] =
ServiceClientImpl.prototype[name];
}
});
ServiceClientImpl.service = methods;
return ServiceClientImpl;
};
}
export type GrpcObject = {
[index: string]: GrpcObject | ServiceClientConstructor;
[index: string]: GrpcObject|ServiceClientConstructor;
};
/**
@ -135,20 +126,23 @@ export type GrpcObject = {
* @param packageDef The package definition object.
* @return The resulting gRPC object.
*/
export function loadPackageDefinition(packageDef: PackageDefinition): GrpcObject {
export function loadPackageDefinition(packageDef: PackageDefinition):
GrpcObject {
const result: GrpcObject = {};
for (const serviceFqn in packageDef) {
const service = packageDef[serviceFqn];
const nameComponents = serviceFqn.split('.');
const serviceName = nameComponents[nameComponents.length-1];
let current = result;
for (const packageName of nameComponents.slice(0, -1)) {
if (!current[packageName]) {
current[packageName] = {};
if (packageDef.hasOwnProperty(serviceFqn)) {
const service = packageDef[serviceFqn];
const nameComponents = serviceFqn.split('.');
const serviceName = nameComponents[nameComponents.length - 1];
let current = result;
for (const packageName of nameComponents.slice(0, -1)) {
if (!current[packageName]) {
current[packageName] = {};
}
current = current[packageName] as GrpcObject;
}
current = current[packageName] as GrpcObject;
current[serviceName] = makeClientConstructor(service, serviceName, {});
}
current[serviceName] = makeClientConstructor(service, serviceName, {});
}
return result;
}

View File

@ -1,19 +1,20 @@
import {CallStream} from './call-stream';
import {Channel} from './channel';
import {BaseFilter, Filter, FilterFactory} from './filter';
import {StatusObject} from './call-stream';
import {Channel} from './channel';
import {Status} from './constants';
import {BaseFilter, Filter, FilterFactory} from './filter';
export class MetadataStatusFilter extends BaseFilter implements Filter {
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
let { code, details, metadata } = await status;
// tslint:disable-next-line:prefer-const
let {code, details, metadata} = await status;
if (code !== Status.UNKNOWN) {
// we already have a known status, so don't assign a new one.
return { code, details, metadata };
return {code, details, metadata};
}
const metadataMap = metadata.getMap();
if (typeof metadataMap['grpc-status'] === 'string') {
let receivedCode = Number(metadataMap['grpc-status']);
const receivedCode = Number(metadataMap['grpc-status']);
if (receivedCode in Status) {
code = receivedCode;
}
@ -23,7 +24,7 @@ export class MetadataStatusFilter extends BaseFilter implements Filter {
details = decodeURI(metadataMap['grpc-message'] as string);
metadata.remove('grpc-message');
}
return { code, details, metadata };
return {code, details, metadata};
}
}

View File

@ -1,9 +1,9 @@
import * as http2 from 'http2';
import {forOwn} from 'lodash';
export type MetadataValue = string | Buffer;
export type MetadataValue = string|Buffer;
export interface MetadataObject { [key: string]: Array<MetadataValue>; }
export interface MetadataObject { [key: string]: MetadataValue[]; }
function cloneMetadataObject(repr: MetadataObject): MetadataObject {
const result: MetadataObject = {};
@ -113,7 +113,7 @@ export class Metadata {
* @param key The key whose value should be retrieved.
* @return A list of values associated with the given key.
*/
get(key: string): Array<MetadataValue> {
get(key: string): MetadataValue[] {
key = normalizeKey(key);
validate(key);
if (Object.prototype.hasOwnProperty.call(this.internalRepr, key)) {
@ -144,7 +144,7 @@ export class Metadata {
* @return The newly cloned object.
*/
clone(): Metadata {
let newMetadata = new Metadata();
const newMetadata = new Metadata();
newMetadata.internalRepr = cloneMetadataObject(this.internalRepr);
return newMetadata;
}
@ -181,7 +181,7 @@ export class Metadata {
});
return result;
}
// For compatibility with the other Metadata implementation
private _getCoreRepresentation() {
return this.internalRepr;
@ -201,8 +201,9 @@ export class Metadata {
result.add(key, Buffer.from(value, 'base64'));
});
} else if (values !== undefined) {
values.split(',').map(v => v.trim()).forEach(v =>
result.add(key, Buffer.from(v, 'base64')));
values.split(',')
.map(v => v.trim())
.forEach(v => result.add(key, Buffer.from(v, 'base64')));
}
} else {
if (Array.isArray(values)) {
@ -210,8 +211,7 @@ export class Metadata {
result.add(key, value);
});
} else if (values !== undefined) {
values.split(',').map(v => v.trim()).forEach(v =>
result.add(key, v));
values.split(',').map(v => v.trim()).forEach(v => result.add(key, v));
}
}
});

View File

@ -1,14 +1,15 @@
import {Duplex, Readable, Writable} from 'stream';
import {EmitterAugmentation1} from './events';
// tslint:disable:no-any
export interface IntermediateObjectReadable<T> extends Readable {
read(size?: number): any&T;
}
export type ObjectReadable<T> = {
read(size?: number): T;
} & EmitterAugmentation1<'data', T>
& IntermediateObjectReadable<T>;
}&EmitterAugmentation1<'data', T>&IntermediateObjectReadable<T>;
export interface IntermediateObjectWritable<T> extends Writable {
_write(chunk: any&T, encoding: string, callback: Function): void;
@ -39,4 +40,4 @@ export type ObjectDuplex<T, U> = {
end(): void;
end(chunk: T, cb?: Function): void;
end(chunk: T, encoding?: any, cb?: Function): void;
} & Duplex & ObjectWritable<T> & ObjectReadable<U>;
}&Duplex&ObjectWritable<T>&ObjectReadable<U>;

View File

@ -4,6 +4,7 @@ export function mockFunction(): never {
throw new Error('Not implemented');
}
// tslint:disable-next-line:no-namespace
export namespace assert2 {
const toCall = new Map<() => void, number>();
const afterCallsQueue: Array<() => void> = [];
@ -17,7 +18,9 @@ export namespace assert2 {
try {
return fn();
} catch (e) {
assert.throws(() => {throw e});
assert.throws(() => {
throw e;
});
throw e; // for type safety only
}
}
@ -42,7 +45,9 @@ export namespace assert2 {
* Wraps a function to keep track of whether it was called or not.
* @param fn The function to wrap.
*/
export function mustCall<T>(fn: (...args: any[]) => T): (...args: any[]) => T {
// tslint:disable:no-any
export function mustCall<T>(fn: (...args: any[]) => T):
(...args: any[]) => T {
const existingValue = toCall.get(fn);
if (existingValue !== undefined) {
toCall.set(fn, existingValue + 1);
@ -62,6 +67,7 @@ export namespace assert2 {
return result;
};
}
// tslint:enable:no-any
/**
* Calls the given function when every function that was wrapped with

View File

@ -27,7 +27,8 @@ describe('CallCredentials', () => {
describe('createFromMetadataGenerator', () => {
it('should accept a metadata generator', () => {
assert.doesNotThrow(
() => CallCredentials.createFromMetadataGenerator(generateFromServiceURL));
() => CallCredentials.createFromMetadataGenerator(
generateFromServiceURL));
});
});
@ -58,11 +59,12 @@ describe('CallCredentials', () => {
describe('generateMetadata', () => {
it('should call the function passed to createFromMetadataGenerator',
async () => {
const callCredentials =
CallCredentials.createFromMetadataGenerator(generateFromServiceURL);
const callCredentials = CallCredentials.createFromMetadataGenerator(
generateFromServiceURL);
let metadata: Metadata;
try {
metadata = await callCredentials.generateMetadata({service_url: 'foo'});
metadata =
await callCredentials.generateMetadata({service_url: 'foo'});
} catch (err) {
throw err;
}

View File

@ -1,32 +1,33 @@
import * as assert from 'assert';
import { CallCredentials } from '../src/call-credentials';
import { Http2CallStream } from '../src/call-stream';
import { mockFunction, assert2 } from './common';
import { Status } from '../src/constants';
import { EventEmitter } from 'events';
import { FilterStackFactory } from '../src/filter-stack';
import {EventEmitter} from 'events';
import * as http2 from 'http2';
import { forOwn, range } from 'lodash';
import { Metadata } from '../src/metadata';
import {forOwn, range} from 'lodash';
import * as stream from 'stream';
import {CallCredentials} from '../src/call-credentials';
import {Http2CallStream} from '../src/call-stream';
import {Status} from '../src/constants';
import {FilterStackFactory} from '../src/filter-stack';
import {Metadata} from '../src/metadata';
import {assert2, mockFunction} from './common';
interface DataFrames {
payload: Buffer,
frameLengths: number[]
payload: Buffer;
frameLengths: number[];
}
const {
HTTP2_HEADER_STATUS
} = http2.constants;
const {HTTP2_HEADER_STATUS} = http2.constants;
function serialize(data: string): Buffer {
const header: Buffer = Buffer.alloc(5);
header.writeUInt8(0, 0); // TODO: Uncompressed only
header.writeUInt8(0, 0); // TODO: Uncompressed only
header.writeInt32BE(data.length, 1);
return Buffer.concat([header, Buffer.from(data, 'utf8')]);
}
class ClientHttp2StreamMock extends stream.Duplex implements http2.ClientHttp2Stream {
class ClientHttp2StreamMock extends stream.Duplex implements
http2.ClientHttp2Stream {
constructor(private readonly dataFrames: DataFrames) {
super();
}
@ -38,13 +39,15 @@ class ClientHttp2StreamMock extends stream.Duplex implements http2.ClientHttp2St
}
bytesRead = 0;
dataFrame = 0;
aborted: boolean = false;
closed: boolean = false;
destroyed: boolean = false;
pending: boolean = false;
rstCode: number = 0;
aborted = false;
closed = false;
destroyed = false;
pending = false;
rstCode = 0;
// tslint:disable:no-any
session: http2.Http2Session = {} as any;
state: http2.StreamState = {} as any;
// tslint:enable:no-any
close = mockFunction;
priority = mockFunction;
rstStream = mockFunction;
@ -58,7 +61,7 @@ class ClientHttp2StreamMock extends stream.Duplex implements http2.ClientHttp2St
if (this.dataFrame === this.dataFrames.frameLengths.length) {
if (this.bytesRead < this.dataFrames.payload.length) {
this.push(this.dataFrames.payload.slice(
this.bytesRead, this.dataFrames.payload.length));
this.bytesRead, this.dataFrames.payload.length));
}
this.push(null);
return;
@ -66,7 +69,7 @@ class ClientHttp2StreamMock extends stream.Duplex implements http2.ClientHttp2St
const from = this.bytesRead;
this.bytesRead += this.dataFrames.frameLengths[this.dataFrame++];
this.push(this.dataFrames.payload.slice(from, this.bytesRead));
};
}
_write(chunk: Buffer, encoding: string, cb: Function) {
this.emit('write', chunk);
cb();
@ -81,28 +84,28 @@ describe('CallStream', () => {
host: ''
};
const filterStackFactory = new FilterStackFactory([]);
const message = 'eat this message'; // 16 bytes
const message = 'eat this message'; // 16 bytes
beforeEach(() => {
assert2.clearMustCalls();
});
it('should emit a metadata event when it receives a response event', (done) => {
const responseMetadata = new Metadata();
responseMetadata.add('key', 'value');
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
it('should emit a metadata event when it receives a response event',
(done) => {
const responseMetadata = new Metadata();
responseMetadata.add('key', 'value');
const callStream =
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
callStream.once('metadata', assert2.mustCall((metadata) => {
assert.deepStrictEqual(metadata.get('key'), ['value']);
}));
callStream.attachHttp2Stream(http2Stream);
http2Stream.emitResponse(200, responseMetadata);
assert2.afterMustCallsSatisfied(done);
});
const http2Stream = new ClientHttp2StreamMock(
{payload: Buffer.alloc(0), frameLengths: []});
callStream.once('metadata', assert2.mustCall((metadata) => {
assert.deepStrictEqual(metadata.get('key'), ['value']);
}));
callStream.attachHttp2Stream(http2Stream);
http2Stream.emitResponse(200, responseMetadata);
assert2.afterMustCallsSatisfied(done);
});
describe('should end a call with an error if a stream was closed', () => {
const c = http2.constants;
@ -126,14 +129,13 @@ describe('CallStream', () => {
keys.forEach((key) => {
const value = errorCodeMapping[key];
// A null value indicates: behavior isn't specified, so skip this test.
let maybeSkip = (fn: typeof it) => value ? fn : fn.skip;
const maybeSkip = (fn: typeof it) => value ? fn : fn.skip;
maybeSkip(it)(`for error code ${key}`, () => {
return new Promise((resolve, reject) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
const callStream =
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock(
{payload: Buffer.alloc(0), frameLengths: []});
callStream.attachHttp2Stream(http2Stream);
callStream.once('status', (status) => {
try {
@ -150,7 +152,8 @@ describe('CallStream', () => {
});
it('should have functioning getters', (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const callStream =
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
assert.strictEqual(callStream.getDeadline(), callStreamArgs.deadline);
assert.strictEqual(callStream.getCredentials(), callStreamArgs.credentials);
assert.strictEqual(callStream.getStatus(), null);
@ -166,11 +169,10 @@ describe('CallStream', () => {
describe('attachHttp2Stream', () => {
it('should handle an empty message', (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: serialize(''),
frameLengths: []
});
const callStream =
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream =
new ClientHttp2StreamMock({payload: serialize(''), frameLengths: []});
callStream.once('data', assert2.mustCall((buffer) => {
assert.strictEqual(buffer.toString('utf8'), '');
}));
@ -178,65 +180,57 @@ describe('CallStream', () => {
assert2.afterMustCallsSatisfied(done);
});
[
{
description: 'all data is supplied in a single frame',
frameLengths: []
},
{
description: 'frames are split along header field delimiters',
frameLengths: [1, 4]
},
{
description: 'portions of header fields are split between different frames',
frameLengths: [2, 1, 1, 4]
},
{
description: 'frames are split into bytes',
frameLengths: range(0, 20).map(() => 1)
}
].forEach((testCase: { description: string, frameLengths: number[] }) => {
it(`should handle a short message where ${testCase.description}`, (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: serialize(message), // 21 bytes
frameLengths: testCase.frameLengths
});
callStream.once('data', assert2.mustCall((buffer) => {
assert.strictEqual(buffer.toString('utf8'), message);
}));
callStream.once('end', assert2.mustCall(() => {}));
callStream.attachHttp2Stream(http2Stream);
assert2.afterMustCallsSatisfied(done);
});
[{description: 'all data is supplied in a single frame', frameLengths: []},
{
description: 'frames are split along header field delimiters',
frameLengths: [1, 4]
},
{
description:
'portions of header fields are split between different frames',
frameLengths: [2, 1, 1, 4]
},
{
description: 'frames are split into bytes',
frameLengths: range(0, 20).map(() => 1)
}].forEach((testCase: {description: string, frameLengths: number[]}) => {
it(`should handle a short message where ${testCase.description}`,
(done) => {
const callStream =
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: serialize(message), // 21 bytes
frameLengths: testCase.frameLengths
});
callStream.once('data', assert2.mustCall((buffer) => {
assert.strictEqual(buffer.toString('utf8'), message);
}));
callStream.once('end', assert2.mustCall(() => {}));
callStream.attachHttp2Stream(http2Stream);
assert2.afterMustCallsSatisfied(done);
});
});
[
{
description: 'all data is supplied in a single frame',
frameLengths: []
},
{
description: 'frames are split between delimited messages',
frameLengths: [21]
},
{
description: 'frames are split within messages',
frameLengths: [10, 22]
},
{
description: 'part of 2nd message\'s header is in first frame',
frameLengths: [24]
},
{
description: 'frames are split into bytes',
frameLengths: range(0, 41).map(() => 1)
}
].forEach((testCase: { description: string, frameLengths: number[] }) => {
[{description: 'all data is supplied in a single frame', frameLengths: []},
{
description: 'frames are split between delimited messages',
frameLengths: [21]
},
{description: 'frames are split within messages', frameLengths: [10, 22]},
{
description: 'part of 2nd message\'s header is in first frame',
frameLengths: [24]
},
{
description: 'frames are split into bytes',
frameLengths: range(0, 41).map(() => 1)
}].forEach((testCase: {description: string, frameLengths: number[]}) => {
it(`should handle two messages where ${testCase.description}`, (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const callStream =
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.concat([serialize(message), serialize(message)]), // 42 bytes
payload: Buffer.concat(
[serialize(message), serialize(message)]), // 42 bytes
frameLengths: testCase.frameLengths
});
callStream.once('data', assert2.mustCall((buffer) => {
@ -252,11 +246,10 @@ describe('CallStream', () => {
});
it('should send buffered writes', (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
const callStream =
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock(
{payload: Buffer.alloc(0), frameLengths: []});
let streamFlushed = false;
http2Stream.once('write', assert2.mustCall((chunk: Buffer) => {
const dataLength = chunk.readInt32BE(1);
@ -265,9 +258,7 @@ describe('CallStream', () => {
assert.strictEqual(encodedMessage, message);
streamFlushed = true;
}));
callStream.write({
message: Buffer.from(message)
}, assert2.mustCall(() => {
callStream.write({message: Buffer.from(message)}, assert2.mustCall(() => {
// Ensure this is called only after contents are written to http2Stream
assert.ok(streamFlushed);
}));
@ -276,32 +267,30 @@ describe('CallStream', () => {
assert2.afterMustCallsSatisfied(done);
});
it('should cause data chunks in write calls afterward to be written to the given stream', (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
http2Stream.once('write', assert2.mustCall((chunk: Buffer) => {
const dataLength = chunk.readInt32BE(1);
const encodedMessage = chunk.slice(5).toString('utf8');
assert.strictEqual(dataLength, message.length);
assert.strictEqual(encodedMessage, message);
}));
callStream.attachHttp2Stream(http2Stream);
callStream.write({
message: Buffer.from(message)
}, assert2.mustCall(() => {}));
callStream.end(assert2.mustCall(() => {}));
assert2.afterMustCallsSatisfied(done);
});
it('should cause data chunks in write calls afterward to be written to the given stream',
(done) => {
const callStream =
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock(
{payload: Buffer.alloc(0), frameLengths: []});
http2Stream.once('write', assert2.mustCall((chunk: Buffer) => {
const dataLength = chunk.readInt32BE(1);
const encodedMessage = chunk.slice(5).toString('utf8');
assert.strictEqual(dataLength, message.length);
assert.strictEqual(encodedMessage, message);
}));
callStream.attachHttp2Stream(http2Stream);
callStream.write(
{message: Buffer.from(message)}, assert2.mustCall(() => {}));
callStream.end(assert2.mustCall(() => {}));
assert2.afterMustCallsSatisfied(done);
});
it('should handle underlying stream errors', () => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
const callStream =
new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock(
{payload: Buffer.alloc(0), frameLengths: []});
callStream.once('status', assert2.mustCall((status) => {
assert.strictEqual(status.code, Status.INTERNAL);
}));

View File

@ -32,6 +32,7 @@ class CallCredentialsMock implements CallCredentials {
}
}
// tslint:disable-next-line:no-any
const readFile: (...args: any[]) => Promise<Buffer> = promisify(fs.readFile);
// A promise which resolves to loaded files in the form { ca, key, cert }
const pFixtures = Promise

View File

@ -1,2 +1,3 @@
settings:
'#': It's possible to have node_version here as a key to override the core's version.
'node_version': 1.11.1

View File

@ -78,6 +78,13 @@ declare module "grpc" {
*/
export function load<T = GrpcObject>(filename: Filename, format?: "proto" | "json", options?: LoadOptions): T;
/**
* Load a gRPC package definition as a gRPC object hierarchy
* @param packageDef The package definition object
* @return The resulting gRPC object
*/
export function loadPackageDefinition(packageDefinition: PackageDefinition): GrpcObject;
/**
* A filename
*/
@ -235,12 +242,18 @@ declare module "grpc" {
/**
* An object that completely defines a service.
* @typedef {Object.<string, grpc~MethodDefinition>} grpc~ServiceDefinition
*/
export type ServiceDefinition<ImplementationType> = {
readonly [I in keyof ImplementationType]: MethodDefinition<any, any>;
}
/**
* An object that defines a package containing multiple services
*/
export type PackageDefinition = {
readonly [fullyQualifiedName: string]: ServiceDefinition<any>;
}
/**
* An object that completely defines a service method signature.
*/
@ -1259,4 +1272,197 @@ declare module "grpc" {
* @param clientObj The client to close
*/
export function closeClient(clientObj: Client): void;
/**
* A builder for gRPC status objects
*/
export class StatusBuilder {
constructor()
/**
* Adds a status code to the builder
* @param code The status code
*/
withCode(code: number): this;
/**
* Adds details to the builder
* @param details A status message
*/
withDetails(details: string): this;
/**
* Adds metadata to the builder
* @param metadata The gRPC status metadata
*/
withMetadata(metadata: Metadata): this;
/**
* Builds the status object
* @return A gRPC status
*/
build(): StatusObject;
}
export type MetadataListener = (metadata: Metadata, next: Function) => void;
export type MessageListener = (message: any, next: Function) => void;
export type StatusListener = (status: StatusObject, next: Function) => void;
export interface Listener {
onReceiveMetadata?: MetadataListener;
onReceiveMessage?: MessageListener;
onReceiveStatus?: StatusListener;
}
/**
* A builder for listener interceptors
*/
export class ListenerBuilder {
constructor();
/**
* Adds onReceiveMetadata method to the builder
* @param onReceiveMetadata A listener method for receiving metadata
*/
withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this;
/**
* Adds onReceiveMessage method to the builder
* @param onReceiveMessage A listener method for receiving message
*/
withOnReceiveMessage(onReceiveMessage: MessageListener): this;
/**
* Adds onReceiveStatus method to the builder
* @param onReceiveStatus A listener method for receiving status
*/
withOnReceiveStatus(onReceiveStatus: StatusListener): this;
/**
* Builds the call listener
*/
build(): Listener;
}
export type MetadataRequester = (metadata: Metadata, listener: Listener, next: Function) => void;
export type MessageRequester = (message: any, next: Function) => void;
export type CloseRequester = (next: Function) => void;
export type CancelRequester = (next: Function) => void;
export type GetPeerRequester = (next: Function) => string;
export interface Requester {
start?: MetadataRequester;
sendMessage?: MessageRequester;
halfClose?: CloseRequester;
cancel?: CancelRequester;
getPeer?: GetPeerRequester;
}
/**
* A builder for the outbound methods of an interceptor
*/
export class RequesterBuilder {
constructor();
/**
* Add a metadata requester to the builder
* @param start A requester method for handling metadata
*/
withStart(start: MetadataRequester): this;
/**
* Add a message requester to the builder.
* @param sendMessage A requester method for handling
* messages.
*/
withSendMessage(sendMessage: MessageRequester): this;
/**
* Add a close requester to the builder.
* @param halfClose A requester method for handling client
* close.
*/
withHalfClose(halfClose: CloseRequester): this;
/**
* Add a cancel requester to the builder.
* @param cancel A requester method for handling `cancel`
*/
withCancel(cancel: CancelRequester): this;
/**
* Builds the requester's interceptor methods.
*/
build(): Requester;
}
/**
* A chainable gRPC call proxy which will delegate to an optional requester
* object. By default, interceptor methods will chain to nextCall. If a
* requester is provided which implements an interceptor method, that
* requester method will be executed as part of the chain.
* operations.
*/
export class InterceptingCall {
/**
* @param next_Call The next call in the chain
* @param requester Interceptor methods to handle request
*/
constructor(nextCall: InterceptingCall|null, requester?: Requester);
/**
* Starts a call through the outbound interceptor chain and adds an element to
* the reciprocal inbound listener chain.
*/
start(metadata: Metadata, listener: Listener): void;
/**
* Pass a message through the interceptor chain.
*/
sendMessage(message: any): void;
/**
* Run a close operation through the interceptor chain
*/
halfClose(): void;
/**
* Run a cancel operation through the interceptor chain
*/
cancel(): void;
/**
* Run a cancelWithStatus operation through the interceptor chain.
* @param status
* @param message
*/
cancelWithStatus(status: StatusObject, message: string): void;
/**
* Pass a getPeer call down to the base gRPC call (should not be intercepted)
*/
getPeer(): object;
/**
* For streaming calls, we need to transparently pass the stream's context
* through the interceptor chain. Passes the context between InterceptingCalls
* but hides it from any requester implementations.
* @param context Carries objects needed for streaming operations.
* @param message The message to send.
*/
sendMessageWithContext(context: object, message: any): void;
/**
* For receiving streaming messages, we need to seed the base interceptor with
* the streaming context to create a RECV_MESSAGE batch.
* @param context Carries objects needed for streaming operations
*/
recvMessageWithContext(context: object): void;
}
}

View File

@ -1,6 +1,6 @@
{
"name": "grpc",
"version": "1.12.0-dev",
"version": "1.11.1",
"author": "Google Inc.",
"description": "gRPC Library for Node",
"homepage": "https://grpc.io/",
@ -30,7 +30,7 @@
"dependencies": {
"lodash": "^4.15.0",
"nan": "^2.0.0",
"node-pre-gyp": "0.7.0",
"node-pre-gyp": "^0.10.0",
"protobufjs": "^5.0.0"
},
"devDependencies": {
@ -71,6 +71,7 @@
"deps/grpc/third_party/boringssl/crypto/**/*.{c,cc,h}",
"deps/grpc/third_party/boringssl/include/**/*.{c,cc,h}",
"deps/grpc/third_party/boringssl/ssl/**/*.{c,cc,h}",
"deps/grpc/third_party/boringssl/third_party/**/*.{c,h}",
"deps/grpc/third_party/abseil-cpp/absl/**/*.{h,hh}",
"binding.gyp"
],

View File

@ -32,7 +32,7 @@
"dependencies": {
"lodash": "^4.15.0",
"nan": "^2.0.0",
"node-pre-gyp": "0.7.0",
"node-pre-gyp": "^0.10.0",
"protobufjs": "^5.0.0"
},
"devDependencies": {
@ -73,6 +73,7 @@
"deps/grpc/third_party/boringssl/crypto/**/*.{c,cc,h}",
"deps/grpc/third_party/boringssl/include/**/*.{c,cc,h}",
"deps/grpc/third_party/boringssl/ssl/**/*.{c,cc,h}",
"deps/grpc/third_party/boringssl/third_party/**/*.{c,h}",
"deps/grpc/third_party/abseil-cpp/absl/**/*.{h,hh}",
"binding.gyp"
],

View File

@ -1,2 +1,2 @@
FROM node:8-alpine
FROM node:10-alpine
RUN apk add --no-cache python curl bash build-base

View File

@ -14,9 +14,9 @@
set arch_list=ia32 x64
set node_versions=4.0.0 5.0.0 6.0.0 7.0.0 8.0.0 9.0.0
set node_versions=4.0.0 5.0.0 6.0.0 7.0.0 8.0.0 9.0.0 10.0.0
set electron_versions=1.0.0 1.1.0 1.2.0 1.3.0 1.4.0 1.5.0 1.6.0 1.7.0 1.8.0
set electron_versions=1.0.0 1.1.0 1.2.0 1.3.0 1.4.0 1.5.0 1.6.0 1.7.0 1.8.0 2.0.0
set PATH=%PATH%;C:\Program Files\nodejs\;%APPDATA%\npm

View File

@ -13,12 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
arch_list=( ia32 x64 )
node_versions=( 4.0.0 5.0.0 6.0.0 7.0.0 8.0.0 9.0.0 )
electron_versions=( 1.0.0 1.1.0 1.2.0 1.3.0 1.4.0 1.5.0 1.6.0 1.7.0 1.8.0 )
node_versions=( 4.0.0 5.0.0 6.0.0 7.0.0 8.0.0 9.0.0 10.0.0 )
electron_versions=( 1.0.0 1.1.0 1.2.0 1.3.0 1.4.0 1.5.0 1.6.0 1.7.0 1.8.0 2.0.0 )
while true ; do
case $1 in

View File

@ -15,6 +15,9 @@
set -ex
# https://github.com/mapbox/node-pre-gyp/issues/362
npm install -g node-gyp
cd $(dirname $0)/../../..
rm -rf build || true
@ -23,7 +26,7 @@ mkdir -p "${ARTIFACTS_OUT}"
npm update
node_versions=( 4.0.0 5.0.0 6.0.0 7.0.0 8.0.0 9.0.0 )
node_versions=( 4.0.0 5.0.0 6.0.0 7.0.0 8.0.0 9.0.0 10.0.0 )
for version in ${node_versions[@]}
do

View File

@ -1,5 +1,7 @@
# gRPC Protobuf Loader
A utility package for loading `.proto` files for use with gRPC, using the latest Protobuf.js package.
## Installation
```sh
@ -13,7 +15,37 @@ const protoLoader = require('@grpc/proto-loader');
const grpcLibrary = require('grpc');
// OR
const grpcLibrary = require('@grpc/grpc-js');
protoLoader.load(protoFile, options).then(packageDefinition => {
protoLoader.load(protoFileName, options).then(packageDefinition => {
const package = grpcLibrary.loadPackageDefinition(packageDefinition);
});
// OR
const packageDefinition = protoLoader.loadSync(protoFileName, options);
const package = grpcLibrary.loadPackageDefinition(packageDefinition);
```
The options parameter is an object that can have the following optional properties:
| Field name | Valid values | Description
|------------|--------------|------------
| `keepCase` | `true` or `false` | Preserve field names. The default is to change them to camel case.
| `longs` | `String` or `Number` | The type to use to represent `long` values. Defaults to a `Long` object type.
| `enums` | `String` | The type to use to represent `enum` values. Defaults to the numeric value.
| `bytes` | `Array` or `String` | The type to use to represent `bytes` values. Defaults to `Buffer`.
| `defaults` | `true` or `false` | Set default values on output objects. Defaults to `false`.
| `arrays` | `true` or `false` | Set empty arrays for missing array values even if `defaults` is `false` Defaults to `false`.
| `objects` | `true` or `false` | Set empty objects for missing object values even if `defaults` is `false` Defaults to `false`.
| `oneofs` | `true` or `false` | Set virtual oneof properties to the present field's name. Defaults to `false`.
| `includeDirs` | An array of strings | A list of search paths for imported `.proto` files.
The following options object closely approximates the existing behavior of `grpc.load`:
```js
const options = {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
}
```

View File

@ -1,8 +1,17 @@
{
"name": "@grpc/proto-loader",
"version": "0.1.0",
"description": "",
"author": "Google Inc.",
"contributors": [
{
"name": "Michael Lumish",
"email": "mlumish@google.com"
}
],
"description": "gRPC utility library for loading .proto files",
"homepage": "https://grpc.io/",
"main": "build/src/index.js",
"typings": "build/src/index.d.ts",
"scripts": {
"build": "npm run compile",
"clean": "gts clean",
@ -20,7 +29,6 @@
"type": "git",
"url": "https://github.com/grpc/grpc-node.git"
},
"author": "",
"license": "Apache-2.0",
"bugs": {
"url": "https://github.com/grpc/grpc-node/issues"
@ -32,10 +40,12 @@
"dependencies": {
"@types/lodash": "^4.14.104",
"@types/node": "^9.4.6",
"lodash": "^4.17.5",
"protobufjs": "^6.8.6"
},
"devDependencies": {
"clang-format": "^1.2.2",
"gts": "^0.5.3",
"lodash": "^4.17.5",
"protobufjs": "^6.8.6",
"typescript": "~2.7.2"
}
}

View File

@ -48,7 +48,7 @@ export interface PackageDefinition {
}
export type Options = Protobuf.IParseOptions & Protobuf.IConversionOptions & {
include?: string[];
includeDirs?: string[];
};
function joinName(baseName: string, name: string): string {
@ -154,15 +154,15 @@ function addIncludePathResolver(root: Protobuf.Root, includePaths: string[]) {
* `defaults` is `false`. Defaults to `false`.
* @param options.oneofs Set virtual oneof properties to the present field's
* name
* @param options.include Paths to search for imported `.proto` files.
* @param options.includeDirs Paths to search for imported `.proto` files.
*/
export function load(filename: string, options: Options): Promise<PackageDefinition> {
const root: Protobuf.Root = new Protobuf.Root();
if (!!options.include) {
if (!(options.include instanceof Array)) {
return Promise.reject(new Error('The include option must be an array'));
if (!!options.includeDirs) {
if (!(options.includeDirs instanceof Array)) {
return Promise.reject(new Error('The includeDirs option must be an array'));
}
addIncludePathResolver(root, options.include as string[]);
addIncludePathResolver(root, options.includeDirs as string[]);
}
return root.load(filename, options).then((loadedRoot) => {
loadedRoot.resolveAll();
@ -172,11 +172,11 @@ export function load(filename: string, options: Options): Promise<PackageDefinit
export function loadSync(filename: string, options: Options): PackageDefinition {
const root: Protobuf.Root = new Protobuf.Root();
if (!!options.include) {
if (!(options.include instanceof Array)) {
if (!!options.includeDirs) {
if (!(options.includeDirs instanceof Array)) {
throw new Error('The include option must be an array');
}
addIncludePathResolver(root, options.include as string[]);
addIncludePathResolver(root, options.includeDirs as string[]);
}
const loadedRoot = root.loadSync(filename, options);
loadedRoot.resolveAll();

View File

@ -39,7 +39,9 @@ SET FAILED=0
for %%v in (4 6 7 8 9) do (
call nvm install %%v
call nvm use %%v
call npm install -g npm
if "%%v"=="4" (
call npm install -g npm@5
)
@rem https://github.com/mapbox/node-pre-gyp/issues/362
call npm install -g node-gyp
node -e "console.log(process.versions)"

View File

@ -29,7 +29,7 @@ var protoPackage = protoLoader.loadSync(
{keepCase: true,
defaults: true,
enums: String,
include: [__dirname + '/../../packages/grpc-native-core/deps/grpc']});
includeDirs: [__dirname + '/../../packages/grpc-native-core/deps/grpc']});
var testProto = grpc.loadPackageDefinition(protoPackage).grpc.testing;
var assert = require('assert');

View File

@ -30,7 +30,7 @@ var protoPackage = protoLoader.loadSync(
{keepCase: true,
defaults: true,
enums: String,
include: [__dirname + '/../../packages/grpc-native-core/deps/grpc']});
includeDirs: [__dirname + '/../../packages/grpc-native-core/deps/grpc']});
var testProto = grpc.loadPackageDefinition(protoPackage).grpc.testing;
var ECHO_INITIAL_KEY = 'x-grpc-test-echo-initial';

View File

@ -14,6 +14,13 @@
@echo "Starting Windows build"
powershell -c "& { iwr https://raw.githubusercontent.com/grumpycoders/nvm-ps/master/nvm.ps1 | iex }"
SET PATH=%APPDATA%\nvm-ps;%APPDATA%\nvm-ps\nodejs;%PATH%
call nvm install 10
call nvm use 10
call npm install -g npm
@rem https://github.com/mapbox/node-pre-gyp/issues/362
call npm install -g node-gyp

View File

@ -13,6 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.33.11/install.sh | bash
export NVM_DIR="$HOME/.nvm"
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh"
nvm install 10
nvm use 10
npm install -g npm
# https://github.com/mapbox/node-pre-gyp/issues/362
npm install -g node-gyp