grpc-js: run gts fix for src

This commit is contained in:
Patrick Remy 2020-04-09 11:54:09 +02:00
parent f4e295cdce
commit b84d2f3b39
No known key found for this signature in database
GPG Key ID: FE25C0B14C0500CD
18 changed files with 435 additions and 281 deletions

View File

@ -79,7 +79,7 @@ class ComposedCallCredentials extends CallCredentials {
async generateMetadata(options: CallMetadataOptions): Promise<Metadata> { async generateMetadata(options: CallMetadataOptions): Promise<Metadata> {
const base: Metadata = new Metadata(); const base: Metadata = new Metadata();
const generated: Metadata[] = await Promise.all( const generated: Metadata[] = await Promise.all(
this.creds.map(cred => cred.generateMetadata(options)) this.creds.map((cred) => cred.generateMetadata(options))
); );
for (const gen of generated) { for (const gen of generated) {
base.merge(gen); base.merge(gen);

View File

@ -111,7 +111,7 @@ export class InterceptingListenerImpl implements InterceptingListener {
) {} ) {}
onReceiveMetadata(metadata: Metadata): void { onReceiveMetadata(metadata: Metadata): void {
this.listener.onReceiveMetadata(metadata, metadata => { this.listener.onReceiveMetadata(metadata, (metadata) => {
this.nextListener.onReceiveMetadata(metadata); this.nextListener.onReceiveMetadata(metadata);
}); });
} }
@ -119,7 +119,7 @@ export class InterceptingListenerImpl implements InterceptingListener {
/* If this listener processes messages asynchronously, the last message may /* If this listener processes messages asynchronously, the last message may
* be reordered with respect to the status */ * be reordered with respect to the status */
this.processingMessage = true; this.processingMessage = true;
this.listener.onReceiveMessage(message, msg => { this.listener.onReceiveMessage(message, (msg) => {
this.processingMessage = false; this.processingMessage = false;
this.nextListener.onReceiveMessage(msg); this.nextListener.onReceiveMessage(msg);
if (this.pendingStatus) { if (this.pendingStatus) {
@ -128,7 +128,7 @@ export class InterceptingListenerImpl implements InterceptingListener {
}); });
} }
onReceiveStatus(status: StatusObject): void { onReceiveStatus(status: StatusObject): void {
this.listener.onReceiveStatus(status, processedStatus => { this.listener.onReceiveStatus(status, (processedStatus) => {
if (this.processingMessage) { if (this.processingMessage) {
this.pendingStatus = processedStatus; this.pendingStatus = processedStatus;
} else { } else {
@ -221,7 +221,9 @@ export class Http2CallStream implements Call {
/* Precondition: this.finalStatus !== null */ /* Precondition: this.finalStatus !== null */
if (!this.statusOutput) { if (!this.statusOutput) {
this.statusOutput = true; this.statusOutput = true;
const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus!); const filteredStatus = this.filterStack.receiveTrailers(
this.finalStatus!
);
this.listener!.onReceiveStatus(filteredStatus); this.listener!.onReceiveStatus(filteredStatus);
if (this.subchannel) { if (this.subchannel) {
this.subchannel.callUnref(); this.subchannel.callUnref();
@ -352,7 +354,7 @@ export class Http2CallStream implements Call {
private handleTrailers(headers: http2.IncomingHttpHeaders) { private handleTrailers(headers: http2.IncomingHttpHeaders) {
let headersString = ''; let headersString = '';
for (const header of Object.keys(headers)) { for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n' headersString += '\t\t' + header + ': ' + headers[header] + '\n';
} }
this.trace('Received server trailers:\n' + headersString); this.trace('Received server trailers:\n' + headersString);
let metadata: Metadata; let metadata: Metadata;
@ -363,7 +365,10 @@ export class Http2CallStream implements Call {
} }
const metadataMap = metadata.getMap(); const metadataMap = metadata.getMap();
let code: Status = this.mappedStatusCode; let code: Status = this.mappedStatusCode;
if (code === Status.UNKNOWN && typeof metadataMap['grpc-status'] === 'string') { if (
code === Status.UNKNOWN &&
typeof metadataMap['grpc-status'] === 'string'
) {
const receivedStatus = Number(metadataMap['grpc-status']); const receivedStatus = Number(metadataMap['grpc-status']);
if (receivedStatus in Status) { if (receivedStatus in Status) {
code = receivedStatus; code = receivedStatus;
@ -375,7 +380,9 @@ export class Http2CallStream implements Call {
if (typeof metadataMap['grpc-message'] === 'string') { if (typeof metadataMap['grpc-message'] === 'string') {
details = decodeURI(metadataMap['grpc-message']); details = decodeURI(metadataMap['grpc-message']);
metadata.remove('grpc-message'); metadata.remove('grpc-message');
this.trace('received status details string "' + details + '" from server'); this.trace(
'received status details string "' + details + '" from server'
);
} }
const status: StatusObject = { code, details, metadata }; const status: StatusObject = { code, details, metadata };
let finalStatus; let finalStatus;
@ -412,7 +419,7 @@ export class Http2CallStream implements Call {
stream.on('response', (headers, flags) => { stream.on('response', (headers, flags) => {
let headersString = ''; let headersString = '';
for (const header of Object.keys(headers)) { for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n' headersString += '\t\t' + header + ': ' + headers[header] + '\n';
} }
this.trace('Received server headers:\n' + headersString); this.trace('Received server headers:\n' + headersString);
switch (headers[':status']) { switch (headers[':status']) {
@ -575,7 +582,9 @@ export class Http2CallStream implements Call {
} }
cancelWithStatus(status: Status, details: string): void { cancelWithStatus(status: Status, details: string): void {
this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"'); this.trace(
'cancelWithStatus code: ' + status + ' details: "' + details + '"'
);
this.destroyHttp2Stream(); this.destroyHttp2Stream();
this.endCall({ code: status, details, metadata: new Metadata() }); this.endCall({ code: status, details, metadata: new Metadata() });
} }
@ -650,7 +659,7 @@ export class Http2CallStream implements Call {
}; };
const cb: WriteCallback = context.callback ?? (() => {}); const cb: WriteCallback = context.callback ?? (() => {});
this.isWriteFilterPending = true; this.isWriteFilterPending = true;
this.filterStack.sendMessage(Promise.resolve(writeObj)).then(message => { this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => {
this.isWriteFilterPending = false; this.isWriteFilterPending = false;
if (this.http2Stream === null) { if (this.http2Stream === null) {
this.trace( this.trace(

View File

@ -100,9 +100,7 @@ export class ClientUnaryCallImpl extends EventEmitter
export class ClientReadableStreamImpl<ResponseType> extends Readable export class ClientReadableStreamImpl<ResponseType> extends Readable
implements ClientReadableStream<ResponseType> { implements ClientReadableStream<ResponseType> {
public call?: InterceptingCallInterface; public call?: InterceptingCallInterface;
constructor( constructor(readonly deserialize: (chunk: Buffer) => ResponseType) {
readonly deserialize: (chunk: Buffer) => ResponseType
) {
super({ objectMode: true }); super({ objectMode: true });
} }
@ -122,9 +120,7 @@ export class ClientReadableStreamImpl<ResponseType> extends Readable
export class ClientWritableStreamImpl<RequestType> extends Writable export class ClientWritableStreamImpl<RequestType> extends Writable
implements ClientWritableStream<RequestType> { implements ClientWritableStream<RequestType> {
public call?: InterceptingCallInterface; public call?: InterceptingCallInterface;
constructor( constructor(readonly serialize: (value: RequestType) => Buffer) {
readonly serialize: (value: RequestType) => Buffer
) {
super({ objectMode: true }); super({ objectMode: true });
} }
@ -140,7 +136,7 @@ export class ClientWritableStreamImpl<RequestType> extends Writable
const context: MessageContext = { const context: MessageContext = {
callback: cb, callback: cb,
}; };
const flags: number = Number(encoding); const flags = Number(encoding);
if (!Number.isNaN(flags)) { if (!Number.isNaN(flags)) {
context.flags = flags; context.flags = flags;
} }
@ -179,7 +175,7 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
const context: MessageContext = { const context: MessageContext = {
callback: cb, callback: cb,
}; };
const flags: number = Number(encoding); const flags = Number(encoding);
if (!Number.isNaN(flags)) { if (!Number.isNaN(flags)) {
context.flags = flags; context.flags = flags;
} }

View File

@ -144,11 +144,23 @@ export class ChannelImplementation implements Channel {
throw new TypeError('Channel target must be a string'); throw new TypeError('Channel target must be a string');
} }
if (!(credentials instanceof ChannelCredentials)) { if (!(credentials instanceof ChannelCredentials)) {
throw new TypeError('Channel credentials must be a ChannelCredentials object'); throw new TypeError(
'Channel credentials must be a ChannelCredentials object'
);
} }
if (options) { if (options) {
if ((typeof options !== 'object') || !Object.values(options).every(value => typeof value === 'string' || typeof value === 'number' || typeof value === 'undefined')) { if (
throw new TypeError('Channel options must be an object with string or number values'); typeof options !== 'object' ||
!Object.values(options).every(
(value) =>
typeof value === 'string' ||
typeof value === 'number' ||
typeof value === 'undefined'
)
) {
throw new TypeError(
'Channel options must be an object with string or number values'
);
} }
} }
/* The global boolean parameter to getSubchannelPool has the inverse meaning to what /* The global boolean parameter to getSubchannelPool has the inverse meaning to what
@ -265,7 +277,7 @@ export class ChannelImplementation implements Channel {
callStream.filterStack callStream.filterStack
.sendMetadata(Promise.resolve(callMetadata.clone())) .sendMetadata(Promise.resolve(callMetadata.clone()))
.then( .then(
finalMetadata => { (finalMetadata) => {
const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState(); const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState();
if (subchannelState === ConnectivityState.READY) { if (subchannelState === ConnectivityState.READY) {
try { try {
@ -274,7 +286,10 @@ export class ChannelImplementation implements Channel {
callStream callStream
); );
} catch (error) { } catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ERR_HTTP2_GOAWAY_SESSION') { if (
(error as NodeJS.ErrnoException).code ===
'ERR_HTTP2_GOAWAY_SESSION'
) {
/* An error here indicates that something went wrong with /* An error here indicates that something went wrong with
* the picked subchannel's http2 stream right before we * the picked subchannel's http2 stream right before we
* tried to start the stream. We are handling a promise * tried to start the stream. We are handling a promise
@ -308,7 +323,10 @@ export class ChannelImplementation implements Channel {
(error as Error).message + (error as Error).message +
'. Ending call' '. Ending call'
); );
callStream.cancelWithStatus(Status.INTERNAL, 'Failed to start HTTP/2 stream'); callStream.cancelWithStatus(
Status.INTERNAL,
'Failed to start HTTP/2 stream'
);
} }
} }
} else { } else {
@ -360,7 +378,7 @@ export class ChannelImplementation implements Channel {
watcherObject: ConnectivityStateWatcher watcherObject: ConnectivityStateWatcher
) { ) {
const watcherIndex = this.connectivityStateWatchers.findIndex( const watcherIndex = this.connectivityStateWatchers.findIndex(
value => value === watcherObject (value) => value === watcherObject
); );
if (watcherIndex >= 0) { if (watcherIndex >= 0) {
this.connectivityStateWatchers.splice(watcherIndex, 1); this.connectivityStateWatchers.splice(watcherIndex, 1);
@ -450,7 +468,9 @@ export class ChannelImplementation implements Channel {
throw new TypeError('Channel#createCall: method must be a string'); throw new TypeError('Channel#createCall: method must be a string');
} }
if (!(typeof deadline === 'number' || deadline instanceof Date)) { if (!(typeof deadline === 'number' || deadline instanceof Date)) {
throw new TypeError('Channel#createCall: deadline must be a number or Date'); throw new TypeError(
'Channel#createCall: deadline must be a number or Date'
);
} }
if (this.connectivityState === ConnectivityState.SHUTDOWN) { if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down'); throw new Error('Channel has been shut down');

View File

@ -179,10 +179,10 @@ const defaultRequester: FullRequester = {
sendMessage: (message, next) => { sendMessage: (message, next) => {
next(message); next(message);
}, },
halfClose: next => { halfClose: (next) => {
next(); next();
}, },
cancel: next => { cancel: (next) => {
next(); next();
}, },
}; };
@ -250,13 +250,13 @@ export class InterceptingCall implements InterceptingCallInterface {
const fullInterceptingListener: InterceptingListener = { const fullInterceptingListener: InterceptingListener = {
onReceiveMetadata: onReceiveMetadata:
interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ?? interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
(metadata => {}), ((metadata) => {}),
onReceiveMessage: onReceiveMessage:
interceptingListener?.onReceiveMessage?.bind(interceptingListener) ?? interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
(message => {}), ((message) => {}),
onReceiveStatus: onReceiveStatus:
interceptingListener?.onReceiveStatus?.bind(interceptingListener) ?? interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
(status => {}), ((status) => {}),
}; };
this.requester.start(metadata, fullInterceptingListener, (md, listener) => { this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
let finalInterceptingListener: InterceptingListener; let finalInterceptingListener: InterceptingListener;
@ -281,7 +281,7 @@ export class InterceptingCall implements InterceptingCallInterface {
} }
sendMessageWithContext(context: MessageContext, message: any): void { sendMessageWithContext(context: MessageContext, message: any): void {
this.processingMessage = true; this.processingMessage = true;
this.requester.sendMessage(message, finalMessage => { this.requester.sendMessage(message, (finalMessage) => {
this.processingMessage = false; this.processingMessage = false;
this.nextCall.sendMessageWithContext(context, finalMessage); this.nextCall.sendMessageWithContext(context, finalMessage);
if (this.pendingHalfClose) { if (this.pendingHalfClose) {
@ -368,10 +368,10 @@ class BaseInterceptingCall implements InterceptingCallInterface {
): void { ): void {
let readError: StatusObject | null = null; let readError: StatusObject | null = null;
this.call.start(metadata, { this.call.start(metadata, {
onReceiveMetadata: metadata => { onReceiveMetadata: (metadata) => {
interceptingListener?.onReceiveMetadata?.(metadata); interceptingListener?.onReceiveMetadata?.(metadata);
}, },
onReceiveMessage: message => { onReceiveMessage: (message) => {
let deserialized: any; let deserialized: any;
try { try {
deserialized = this.methodDefinition.responseDeserialize(message); deserialized = this.methodDefinition.responseDeserialize(message);
@ -385,7 +385,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
this.call.cancelWithStatus(readError.code, readError.details); this.call.cancelWithStatus(readError.code, readError.details);
} }
}, },
onReceiveStatus: status => { onReceiveStatus: (status) => {
if (readError) { if (readError) {
interceptingListener?.onReceiveStatus?.(readError); interceptingListener?.onReceiveStatus?.(readError);
} else { } else {
@ -415,7 +415,7 @@ class BaseUnaryInterceptingCall extends BaseInterceptingCall
let receivedMessage = false; let receivedMessage = false;
const wrapperListener: InterceptingListener = { const wrapperListener: InterceptingListener = {
onReceiveMetadata: onReceiveMetadata:
listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}), listener?.onReceiveMetadata?.bind(listener) ?? ((metadata) => {}),
onReceiveMessage: (message: any) => { onReceiveMessage: (message: any) => {
receivedMessage = true; receivedMessage = true;
listener?.onReceiveMessage?.(message); listener?.onReceiveMessage?.(message);
@ -502,21 +502,21 @@ export function getInterceptingCall(
interceptors = ([] as Interceptor[]) interceptors = ([] as Interceptor[])
.concat( .concat(
interceptorArgs.callInterceptors, interceptorArgs.callInterceptors,
interceptorArgs.callInterceptorProviders.map(provider => interceptorArgs.callInterceptorProviders.map((provider) =>
provider(methodDefinition) provider(methodDefinition)
) )
) )
.filter(interceptor => interceptor); .filter((interceptor) => interceptor);
// Filter out falsy values when providers return nothing // Filter out falsy values when providers return nothing
} else { } else {
interceptors = ([] as Interceptor[]) interceptors = ([] as Interceptor[])
.concat( .concat(
interceptorArgs.clientInterceptors, interceptorArgs.clientInterceptors,
interceptorArgs.clientInterceptorProviders.map(provider => interceptorArgs.clientInterceptorProviders.map((provider) =>
provider(methodDefinition) provider(methodDefinition)
) )
) )
.filter(interceptor => interceptor); .filter((interceptor) => interceptor);
// Filter out falsy values when providers return nothing // Filter out falsy values when providers return nothing
} }
const interceptorOptions = Object.assign({}, options, { const interceptorOptions = Object.assign({}, options, {
@ -531,14 +531,10 @@ export function getInterceptingCall(
* channel. */ * channel. */
const getCall: NextCall = interceptors.reduceRight<NextCall>( const getCall: NextCall = interceptors.reduceRight<NextCall>(
(nextCall: NextCall, nextInterceptor: Interceptor) => { (nextCall: NextCall, nextInterceptor: Interceptor) => {
return currentOptions => nextInterceptor(currentOptions, nextCall); return (currentOptions) => nextInterceptor(currentOptions, nextCall);
}, },
(finalOptions: InterceptorOptions) => (finalOptions: InterceptorOptions) =>
getBottomInterceptingCall( getBottomInterceptingCall(channel, finalOptions, methodDefinition)
channel,
finalOptions,
methodDefinition
)
); );
return getCall(interceptorOptions); return getCall(interceptorOptions);
} }

View File

@ -48,7 +48,12 @@ import {
InterceptorArguments, InterceptorArguments,
InterceptingCallInterface, InterceptingCallInterface,
} from './client-interceptors'; } from './client-interceptors';
import { ServerUnaryCall, ServerReadableStream, ServerWritableStream, ServerDuplexStream } from './server-call'; import {
ServerUnaryCall,
ServerReadableStream,
ServerWritableStream,
ServerDuplexStream,
} from './server-call';
const CHANNEL_SYMBOL = Symbol(); const CHANNEL_SYMBOL = Symbol();
const INTERCEPTOR_SYMBOL = Symbol(); const INTERCEPTOR_SYMBOL = Symbol();
@ -62,7 +67,11 @@ export interface UnaryCallback<ResponseType> {
export interface CallOptions { export interface CallOptions {
deadline?: Deadline; deadline?: Deadline;
host?: string; host?: string;
parent?: ServerUnaryCall<any, any> | ServerReadableStream<any, any> | ServerWritableStream<any, any> | ServerDuplexStream<any, any> parent?:
| ServerUnaryCall<any, any>
| ServerReadableStream<any, any>
| ServerWritableStream<any, any>
| ServerDuplexStream<any, any>;
propagate_flags?: number; propagate_flags?: number;
credentials?: CallCredentials; credentials?: CallCredentials;
interceptors?: Interceptor[]; interceptors?: Interceptor[];
@ -76,11 +85,11 @@ export interface CallProperties<RequestType, ResponseType> {
channel: Channel; channel: Channel;
methodDefinition: ClientMethodDefinition<RequestType, ResponseType>; methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
callOptions: CallOptions; callOptions: CallOptions;
callback?: UnaryCallback<ResponseType> callback?: UnaryCallback<ResponseType>;
} }
export interface CallInvocationTransformer { export interface CallInvocationTransformer {
(callProperties: CallProperties<any, any>): CallProperties<any, any> (callProperties: CallProperties<any, any>): CallProperties<any, any>;
} }
export type ClientOptions = Partial<ChannelOptions> & { export type ClientOptions = Partial<ChannelOptions> & {
@ -123,7 +132,8 @@ export class Client {
'to the client constructor. Only one of these is allowed.' 'to the client constructor. Only one of these is allowed.'
); );
} }
this[CALL_INVOCATION_TRANSFORMER_SYMBOL] = options.callInvocationTransformer; this[CALL_INVOCATION_TRANSFORMER_SYMBOL] =
options.callInvocationTransformer;
delete options.callInvocationTransformer; delete options.callInvocationTransformer;
if (options.channelOverride) { if (options.channelOverride) {
this[CHANNEL_SYMBOL] = options.channelOverride; this[CHANNEL_SYMBOL] = options.channelOverride;
@ -274,17 +284,20 @@ export class Client {
channel: this[CHANNEL_SYMBOL], channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition, methodDefinition: methodDefinition,
callOptions: checkedArguments.options, callOptions: checkedArguments.options,
callback: checkedArguments.callback callback: checkedArguments.callback,
}; };
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) { if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>; callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
callProperties
) as CallProperties<RequestType, ResponseType>;
} }
const emitter: ClientUnaryCall = callProperties.call; const emitter: ClientUnaryCall = callProperties.call;
const interceptorArgs: InterceptorArguments = { const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL], clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: callProperties.callOptions.interceptors ?? [], callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [], callInterceptorProviders:
callProperties.callOptions.interceptor_providers ?? [],
}; };
const call: InterceptingCallInterface = getInterceptingCall( const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs, interceptorArgs,
@ -303,7 +316,7 @@ export class Client {
let responseMessage: ResponseType | null = null; let responseMessage: ResponseType | null = null;
let receivedStatus = false; let receivedStatus = false;
call.start(callProperties.metadata, { call.start(callProperties.metadata, {
onReceiveMetadata: metadata => { onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata); emitter.emit('metadata', metadata);
}, },
onReceiveMessage(message: any) { onReceiveMessage(message: any) {
@ -385,17 +398,22 @@ export class Client {
channel: this[CHANNEL_SYMBOL], channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition, methodDefinition: methodDefinition,
callOptions: checkedArguments.options, callOptions: checkedArguments.options,
callback: checkedArguments.callback callback: checkedArguments.callback,
}; };
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) { if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>; callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
callProperties
) as CallProperties<RequestType, ResponseType>;
} }
const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>; const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<
RequestType
>;
const interceptorArgs: InterceptorArguments = { const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL], clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: callProperties.callOptions.interceptors ?? [], callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [], callInterceptorProviders:
callProperties.callOptions.interceptor_providers ?? [],
}; };
const call: InterceptingCallInterface = getInterceptingCall( const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs, interceptorArgs,
@ -414,7 +432,7 @@ export class Client {
let responseMessage: ResponseType | null = null; let responseMessage: ResponseType | null = null;
let receivedStatus = false; let receivedStatus = false;
call.start(callProperties.metadata, { call.start(callProperties.metadata, {
onReceiveMetadata: metadata => { onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata); emitter.emit('metadata', metadata);
}, },
onReceiveMessage(message: any) { onReceiveMessage(message: any) {
@ -503,17 +521,22 @@ export class Client {
call: new ClientReadableStreamImpl<ResponseType>(deserialize), call: new ClientReadableStreamImpl<ResponseType>(deserialize),
channel: this[CHANNEL_SYMBOL], channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition, methodDefinition: methodDefinition,
callOptions: checkedArguments.options callOptions: checkedArguments.options,
}; };
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) { if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>; callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
callProperties
) as CallProperties<RequestType, ResponseType>;
} }
const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>; const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<
ResponseType
>;
const interceptorArgs: InterceptorArguments = { const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL], clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: callProperties.callOptions.interceptors ?? [], callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [], callInterceptorProviders:
callProperties.callOptions.interceptor_providers ?? [],
}; };
const call: InterceptingCallInterface = getInterceptingCall( const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs, interceptorArgs,
@ -589,20 +612,29 @@ export class Client {
}; };
let callProperties: CallProperties<RequestType, ResponseType> = { let callProperties: CallProperties<RequestType, ResponseType> = {
metadata: checkedArguments.metadata, metadata: checkedArguments.metadata,
call: new ClientDuplexStreamImpl<RequestType, ResponseType>(serialize, deserialize), call: new ClientDuplexStreamImpl<RequestType, ResponseType>(
serialize,
deserialize
),
channel: this[CHANNEL_SYMBOL], channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition, methodDefinition: methodDefinition,
callOptions: checkedArguments.options callOptions: checkedArguments.options,
}; };
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) { if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>; callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
callProperties
) as CallProperties<RequestType, ResponseType>;
} }
const stream: ClientDuplexStream<RequestType, ResponseType> = callProperties.call as ClientDuplexStream<RequestType, ResponseType>; const stream: ClientDuplexStream<
RequestType,
ResponseType
> = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
const interceptorArgs: InterceptorArguments = { const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL], clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: callProperties.callOptions.interceptors ?? [], callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [], callInterceptorProviders:
callProperties.callOptions.interceptor_providers ?? [],
}; };
const call: InterceptingCallInterface = getInterceptingCall( const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs, interceptorArgs,

View File

@ -78,7 +78,7 @@ export class FilterStackFactory implements FilterFactory<FilterStack> {
createFilter(callStream: Call): FilterStack { createFilter(callStream: Call): FilterStack {
return new FilterStack( return new FilterStack(
this.factories.map(factory => factory.createFilter(callStream)) this.factories.map((factory) => factory.createFilter(callStream))
); );
} }
} }

View File

@ -15,14 +15,18 @@
* *
*/ */
import { URL, parse } from "url"; import { URL, parse } from 'url';
import { log } from "./logging"; import { log } from './logging';
import { LogVerbosity } from "./constants"; import { LogVerbosity } from './constants';
import { parseTarget } from "./resolver-dns"; import { parseTarget } from './resolver-dns';
import { Socket } from "net"; import { Socket } from 'net';
import * as http from 'http'; import * as http from 'http';
import * as logging from './logging'; import * as logging from './logging';
import { SubchannelAddress, TcpSubchannelAddress, isTcpSubchannelAddress } from "./subchannel"; import {
SubchannelAddress,
TcpSubchannelAddress,
isTcpSubchannelAddress,
} from './subchannel';
const TRACER_NAME = 'proxy'; const TRACER_NAME = 'proxy';
@ -36,8 +40,8 @@ interface ProxyInfo {
} }
function getProxyInfo(): ProxyInfo { function getProxyInfo(): ProxyInfo {
let proxyEnv: string = ''; let proxyEnv = '';
let envVar: string = ''; let envVar = '';
/* Prefer using 'grpc_proxy'. Fallback on 'http_proxy' if it is not set. /* Prefer using 'grpc_proxy'. Fallback on 'http_proxy' if it is not set.
* Also prefer using 'https_proxy' with fallback on 'http_proxy'. The * Also prefer using 'https_proxy' with fallback on 'http_proxy'. The
* fallback behavior can be removed if there's a demand for it. * fallback behavior can be removed if there's a demand for it.
@ -62,7 +66,10 @@ function getProxyInfo(): ProxyInfo {
return {}; return {};
} }
if (proxyUrl.protocol !== 'http:') { if (proxyUrl.protocol !== 'http:') {
log(LogVerbosity.ERROR, `"${proxyUrl.protocol}" scheme not supported in proxy URI`); log(
LogVerbosity.ERROR,
`"${proxyUrl.protocol}" scheme not supported in proxy URI`
);
return {}; return {};
} }
let userCred: string | null = null; let userCred: string | null = null;
@ -75,12 +82,14 @@ function getProxyInfo(): ProxyInfo {
} }
} }
const result: ProxyInfo = { const result: ProxyInfo = {
address: proxyUrl.host address: proxyUrl.host,
}; };
if (userCred) { if (userCred) {
result.creds = userCred; result.creds = userCred;
} }
trace('Proxy server ' + result.address + ' set by environment variable ' + envVar); trace(
'Proxy server ' + result.address + ' set by environment variable ' + envVar
);
return result; return result;
} }
@ -89,7 +98,7 @@ const PROXY_INFO = getProxyInfo();
function getNoProxyHostList(): string[] { function getNoProxyHostList(): string[] {
/* Prefer using 'no_grpc_proxy'. Fallback on 'no_proxy' if it is not set. */ /* Prefer using 'no_grpc_proxy'. Fallback on 'no_proxy' if it is not set. */
let noProxyStr: string | undefined = process.env.no_grpc_proxy; let noProxyStr: string | undefined = process.env.no_grpc_proxy;
let envVar: string = 'no_grpc_proxy'; let envVar = 'no_grpc_proxy';
if (!noProxyStr) { if (!noProxyStr) {
noProxyStr = process.env.no_proxy; noProxyStr = process.env.no_proxy;
envVar = 'no_proxy'; envVar = 'no_proxy';
@ -124,20 +133,37 @@ export function shouldUseProxy(target: string): boolean {
return true; return true;
} }
export function getProxiedConnection(target: string, subchannelAddress: SubchannelAddress): Promise<Socket> { export function getProxiedConnection(
if (!(PROXY_INFO.address && shouldUseProxy(target) && isTcpSubchannelAddress(subchannelAddress))) { target: string,
subchannelAddress: SubchannelAddress
): Promise<Socket> {
if (
!(
PROXY_INFO.address &&
shouldUseProxy(target) &&
isTcpSubchannelAddress(subchannelAddress)
)
) {
return Promise.reject<Socket>(); return Promise.reject<Socket>();
} }
const subchannelAddressPathString = `${subchannelAddress.host}:${subchannelAddress.port}`; const subchannelAddressPathString = `${subchannelAddress.host}:${subchannelAddress.port}`;
trace('Using proxy ' + PROXY_INFO.address + ' to connect to ' + target + ' at ' + subchannelAddress); trace(
'Using proxy ' +
PROXY_INFO.address +
' to connect to ' +
target +
' at ' +
subchannelAddress
);
const options: http.RequestOptions = { const options: http.RequestOptions = {
method: 'CONNECT', method: 'CONNECT',
host: PROXY_INFO.address, host: PROXY_INFO.address,
path: subchannelAddressPathString path: subchannelAddressPathString,
}; };
if (PROXY_INFO.creds) { if (PROXY_INFO.creds) {
options.headers = { options.headers = {
'Proxy-Authorization': 'Basic ' + Buffer.from(PROXY_INFO.creds).toString('base64') 'Proxy-Authorization':
'Basic ' + Buffer.from(PROXY_INFO.creds).toString('base64'),
}; };
} }
return new Promise<Socket>((resolve, reject) => { return new Promise<Socket>((resolve, reject) => {
@ -146,10 +172,20 @@ export function getProxiedConnection(target: string, subchannelAddress: Subchann
request.removeAllListeners(); request.removeAllListeners();
socket.removeAllListeners(); socket.removeAllListeners();
if (res.statusCode === http.STATUS_CODES.OK) { if (res.statusCode === http.STATUS_CODES.OK) {
trace('Successfully connected to ' + subchannelAddress + ' through proxy ' + PROXY_INFO.address); trace(
'Successfully connected to ' +
subchannelAddress +
' through proxy ' +
PROXY_INFO.address
);
resolve(socket); resolve(socket);
} else { } else {
trace('Failed to connect to ' + subchannelAddress + ' through proxy ' + PROXY_INFO.address); trace(
'Failed to connect to ' +
subchannelAddress +
' through proxy ' +
PROXY_INFO.address
);
reject(); reject();
} }
}); });

View File

@ -28,7 +28,12 @@ import { CallCredentials } from './call-credentials';
import { Deadline, StatusObject } from './call-stream'; import { Deadline, StatusObject } from './call-stream';
import { Channel, ConnectivityState, ChannelImplementation } from './channel'; import { Channel, ConnectivityState, ChannelImplementation } from './channel';
import { ChannelCredentials } from './channel-credentials'; import { ChannelCredentials } from './channel-credentials';
import { CallOptions, Client, CallInvocationTransformer, CallProperties } from './client'; import {
CallOptions,
Client,
CallInvocationTransformer,
CallProperties,
} from './client';
import { LogVerbosity, Status } from './constants'; import { LogVerbosity, Status } from './constants';
import * as logging from './logging'; import * as logging from './logging';
import { import {
@ -129,14 +134,14 @@ export const credentials = mixin(
}); });
} }
getHeaders.then( getHeaders.then(
headers => { (headers) => {
const metadata = new Metadata(); const metadata = new Metadata();
for (const key of Object.keys(headers)) { for (const key of Object.keys(headers)) {
metadata.add(key, headers[key]); metadata.add(key, headers[key]);
} }
callback(null, metadata); callback(null, metadata);
}, },
err => { (err) => {
callback(err); callback(err);
} }
); );
@ -202,7 +207,7 @@ export {
CallProperties, CallProperties,
CallInvocationTransformer, CallInvocationTransformer,
ChannelImplementation as Channel, ChannelImplementation as Channel,
Channel as ChannelInterface Channel as ChannelInterface,
}; };
/** /**

View File

@ -338,11 +338,11 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.resetSubchannelList(); this.resetSubchannelList();
trace( trace(
'Connect to address list ' + 'Connect to address list ' +
this.latestAddressList.map(address => this.latestAddressList.map((address) =>
subchannelAddressToString(address) subchannelAddressToString(address)
) )
); );
this.subchannels = this.latestAddressList.map(address => this.subchannels = this.latestAddressList.map((address) =>
this.channelControlHelper.createSubchannel(address, {}) this.channelControlHelper.createSubchannel(address, {})
); );
for (const subchannel of this.subchannels) { for (const subchannel of this.subchannels) {

View File

@ -125,7 +125,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
private calculateAndUpdateState() { private calculateAndUpdateState() {
if (this.subchannelStateCounts[ConnectivityState.READY] > 0) { if (this.subchannelStateCounts[ConnectivityState.READY] > 0) {
const readySubchannels = this.subchannels.filter( const readySubchannels = this.subchannels.filter(
subchannel => (subchannel) =>
subchannel.getConnectivityState() === ConnectivityState.READY subchannel.getConnectivityState() === ConnectivityState.READY
); );
let index = 0; let index = 0;
@ -192,9 +192,9 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
this.resetSubchannelList(); this.resetSubchannelList();
trace( trace(
'Connect to address list ' + 'Connect to address list ' +
addressList.map(address => subchannelAddressToString(address)) addressList.map((address) => subchannelAddressToString(address))
); );
this.subchannels = addressList.map(address => this.subchannels = addressList.map((address) =>
this.channelControlHelper.createSubchannel(address, {}) this.channelControlHelper.createSubchannel(address, {})
); );
for (const subchannel of this.subchannels) { for (const subchannel of this.subchannels) {

View File

@ -116,7 +116,7 @@ export function makeClientConstructor(
[methodName: string]: Function; [methodName: string]: Function;
} }
Object.keys(methods).forEach(name => { Object.keys(methods).forEach((name) => {
const attrs = methods[name]; const attrs = methods[name];
let methodType: keyof typeof requesterFuncs; let methodType: keyof typeof requesterFuncs;
// TODO(murgatroid99): Verify that we don't need this anymore // TODO(murgatroid99): Verify that we don't need this anymore
@ -164,7 +164,7 @@ function partial(
serialize: Function, serialize: Function,
deserialize: Function deserialize: Function
): Function { ): Function {
return function(this: any, ...args: any[]) { return function (this: any, ...args: any[]) {
return fn.call(this, path, serialize, deserialize, ...args); return fn.call(this, path, serialize, deserialize, ...args);
}; };
} }

View File

@ -178,7 +178,7 @@ export class Metadata {
const newInternalRepr = newMetadata.internalRepr; const newInternalRepr = newMetadata.internalRepr;
this.internalRepr.forEach((value, key) => { this.internalRepr.forEach((value, key) => {
const clonedValue: MetadataValue[] = value.map(v => { const clonedValue: MetadataValue[] = value.map((v) => {
if (v instanceof Buffer) { if (v instanceof Buffer) {
return Buffer.from(v); return Buffer.from(v);
} else { } else {
@ -226,7 +226,7 @@ export class Metadata {
this.internalRepr.forEach((values, key) => { this.internalRepr.forEach((values, key) => {
// We assume that the user's interaction with this object is limited to // We assume that the user's interaction with this object is limited to
// through its public API (i.e. keys and values are already validated). // through its public API (i.e. keys and values are already validated).
result[key] = values.map(value => { result[key] = values.map((value) => {
if (value instanceof Buffer) { if (value instanceof Buffer) {
return value.toString('base64'); return value.toString('base64');
} else { } else {
@ -249,7 +249,7 @@ export class Metadata {
*/ */
static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata { static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata {
const result = new Metadata(); const result = new Metadata();
Object.keys(headers).forEach(key => { Object.keys(headers).forEach((key) => {
// Reserved headers (beginning with `:`) are not valid keys. // Reserved headers (beginning with `:`) are not valid keys.
if (key.charAt(0) === ':') { if (key.charAt(0) === ':') {
return; return;
@ -260,12 +260,12 @@ export class Metadata {
try { try {
if (isBinaryKey(key)) { if (isBinaryKey(key)) {
if (Array.isArray(values)) { if (Array.isArray(values)) {
values.forEach(value => { values.forEach((value) => {
result.add(key, Buffer.from(value, 'base64')); result.add(key, Buffer.from(value, 'base64'));
}); });
} else if (values !== undefined) { } else if (values !== undefined) {
if (isCustomMetadata(key)) { if (isCustomMetadata(key)) {
values.split(',').forEach(v => { values.split(',').forEach((v) => {
result.add(key, Buffer.from(v.trim(), 'base64')); result.add(key, Buffer.from(v.trim(), 'base64'));
}); });
} else { } else {
@ -274,12 +274,12 @@ export class Metadata {
} }
} else { } else {
if (Array.isArray(values)) { if (Array.isArray(values)) {
values.forEach(value => { values.forEach((value) => {
result.add(key, value); result.add(key, value);
}); });
} else if (values !== undefined) { } else if (values !== undefined) {
if (isCustomMetadata(key)) { if (isCustomMetadata(key)) {
values.split(',').forEach(v => result.add(key, v.trim())); values.split(',').forEach((v) => result.add(key, v.trim()));
} else { } else {
result.add(key, values); result.add(key, values);
} }

View File

@ -109,7 +109,7 @@ function mergeArrays<T>(...arrays: T[][]): T[] {
i < i <
Math.max.apply( Math.max.apply(
null, null,
arrays.map(array => array.length) arrays.map((array) => array.length)
); );
i++ i++
) { ) {
@ -186,19 +186,24 @@ class DnsResolver implements Resolver {
* if the name exists but there are no records for that family, and that * if the name exists but there are no records for that family, and that
* error is indistinguishable from other kinds of errors */ * error is indistinguishable from other kinds of errors */
this.pendingLookupPromise = dnsLookupPromise(hostname, { all: true }); this.pendingLookupPromise = dnsLookupPromise(hostname, { all: true });
this.pendingLookupPromise.then(addressList => { this.pendingLookupPromise.then(
(addressList) => {
this.pendingLookupPromise = null; this.pendingLookupPromise = null;
const ip4Addresses: dns.LookupAddress[] = addressList.filter( const ip4Addresses: dns.LookupAddress[] = addressList.filter(
addr => addr.family === 4 (addr) => addr.family === 4
);
const ip6Addresses: dns.LookupAddress[] = addressList.filter(
(addr) => addr.family === 6
); );
const ip6Addresses: dns.LookupAddress[] = addressList.filter(addr => addr.family === 6);
this.latestLookupResult = mergeArrays( this.latestLookupResult = mergeArrays(
ip6Addresses, ip6Addresses,
ip4Addresses ip4Addresses
).map(addr => ({ host: addr.address, port: +this.port! })); ).map((addr) => ({ host: addr.address, port: +this.port! }));
const allAddressesString: string = const allAddressesString: string =
'[' + '[' +
this.latestLookupResult.map(addr => addr.host + ':' + addr.port).join(',') + this.latestLookupResult
.map((addr) => addr.host + ':' + addr.port)
.join(',') +
']'; ']';
trace( trace(
'Resolved addresses for target ' + 'Resolved addresses for target ' +
@ -220,7 +225,7 @@ class DnsResolver implements Resolver {
this.latestServiceConfigError this.latestServiceConfigError
); );
}, },
err => { (err) => {
trace( trace(
'Resolution error for target ' + 'Resolution error for target ' +
this.target + this.target +
@ -229,7 +234,8 @@ class DnsResolver implements Resolver {
); );
this.pendingLookupPromise = null; this.pendingLookupPromise = null;
this.listener.onError(this.defaultResolutionError); this.listener.onError(this.defaultResolutionError);
}); }
);
/* If there already is a still-pending TXT resolution, we can just use /* If there already is a still-pending TXT resolution, we can just use
* that result when it comes in */ * that result when it comes in */
if (this.pendingTxtPromise === null) { if (this.pendingTxtPromise === null) {
@ -237,7 +243,8 @@ class DnsResolver implements Resolver {
* the name resolution attempt as a whole is a success even if the TXT * the name resolution attempt as a whole is a success even if the TXT
* lookup fails */ * lookup fails */
this.pendingTxtPromise = resolveTxtPromise(hostname); this.pendingTxtPromise = resolveTxtPromise(hostname);
this.pendingTxtPromise.then(txtRecord => { this.pendingTxtPromise.then(
(txtRecord) => {
this.pendingTxtPromise = null; this.pendingTxtPromise = null;
try { try {
this.latestServiceConfig = extractAndSelectServiceConfig( this.latestServiceConfig = extractAndSelectServiceConfig(
@ -260,9 +267,10 @@ class DnsResolver implements Resolver {
this.latestLookupResult, this.latestLookupResult,
this.latestServiceConfig, this.latestServiceConfig,
this.latestServiceConfigError this.latestServiceConfigError
) );
} }
}, err => { },
(err) => {
this.latestServiceConfigError = { this.latestServiceConfigError = {
code: Status.UNAVAILABLE, code: Status.UNAVAILABLE,
details: 'TXT query failed', details: 'TXT query failed',
@ -273,9 +281,10 @@ class DnsResolver implements Resolver {
this.latestLookupResult, this.latestLookupResult,
this.latestServiceConfig, this.latestServiceConfig,
this.latestServiceConfigError this.latestServiceConfigError
) );
} }
}); }
);
} }
} }
} }
@ -329,11 +338,15 @@ export interface dnsUrl {
} }
export function parseTarget(target: string): dnsUrl | null { export function parseTarget(target: string): dnsUrl | null {
const match = IPV4_REGEX.exec(target) ?? IPV6_REGEX.exec(target) ?? IPV6_BRACKET_REGEX.exec(target) ?? DNS_REGEX.exec(target) const match =
IPV4_REGEX.exec(target) ??
IPV6_REGEX.exec(target) ??
IPV6_BRACKET_REGEX.exec(target) ??
DNS_REGEX.exec(target);
if (match) { if (match) {
return { return {
host: match[1], host: match[1],
port: match[2] ?? undefined port: match[2] ?? undefined,
}; };
} else { } else {
return null; return null;

View File

@ -157,7 +157,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
this.trailingMetadata = new Metadata(); this.trailingMetadata = new Metadata();
this.call.setupSurfaceCall(this); this.call.setupSurfaceCall(this);
this.on('error', err => { this.on('error', (err) => {
this.call.sendError(err); this.call.sendError(err);
this.end(); this.end();
}); });
@ -226,7 +226,7 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
this.call.setupSurfaceCall(this); this.call.setupSurfaceCall(this);
this.call.setupReadable(this); this.call.setupReadable(this);
this.on('error', err => { this.on('error', (err) => {
this.call.sendError(err); this.call.sendError(err);
this.end(); this.end();
}); });
@ -562,7 +562,7 @@ export class Http2ServerCallStream<
} }
setupSurfaceCall(call: ServerSurfaceCall) { setupSurfaceCall(call: ServerSurfaceCall) {
this.once('cancelled', reason => { this.once('cancelled', (reason) => {
call.cancelled = true; call.cancelled = true;
call.emit('cancelled', reason); call.emit('cancelled', reason);
}); });

View File

@ -48,7 +48,11 @@ import { ServerCredentials } from './server-credentials';
import { ChannelOptions } from './channel-options'; import { ChannelOptions } from './channel-options';
import { createResolver, ResolverListener } from './resolver'; import { createResolver, ResolverListener } from './resolver';
import { log } from './logging'; import { log } from './logging';
import { SubchannelAddress, TcpSubchannelAddress, isTcpSubchannelAddress } from './subchannel'; import {
SubchannelAddress,
TcpSubchannelAddress,
isTcpSubchannelAddress,
} from './subchannel';
interface BindResult { interface BindResult {
port: number; port: number;
@ -152,7 +156,7 @@ export class Server {
throw new Error('Cannot add an empty service to a server'); throw new Error('Cannot add an empty service to a server');
} }
serviceKeys.forEach(name => { serviceKeys.forEach((name) => {
const attrs = service[name]; const attrs = service[name];
let methodType: HandlerType; let methodType: HandlerType;
@ -244,25 +248,30 @@ export class Server {
http2Server.setTimeout(0, noop); http2Server.setTimeout(0, noop);
this._setupHandlers(http2Server); this._setupHandlers(http2Server);
return http2Server; return http2Server;
} };
const bindSpecificPort = (addressList: SubchannelAddress[], portNum: number, previousCount: number): Promise<BindResult> => { const bindSpecificPort = (
addressList: SubchannelAddress[],
portNum: number,
previousCount: number
): Promise<BindResult> => {
if (addressList.length === 0) { if (addressList.length === 0) {
return Promise.resolve({port: portNum, count: previousCount}); return Promise.resolve({ port: portNum, count: previousCount });
} }
return Promise.all(addressList.map(address => { return Promise.all(
addressList.map((address) => {
let addr: SubchannelAddress; let addr: SubchannelAddress;
if (isTcpSubchannelAddress(address)) { if (isTcpSubchannelAddress(address)) {
addr = { addr = {
host: (address as TcpSubchannelAddress).host, host: (address as TcpSubchannelAddress).host,
port: portNum port: portNum,
}; };
} else { } else {
addr = address addr = address;
} }
const http2Server = setupServer(); const http2Server = setupServer();
return new Promise<number|Error>((resolve, reject) => { return new Promise<number | Error>((resolve, reject) => {
function onError(err: Error): void { function onError(err: Error): void {
resolve(err); resolve(err);
} }
@ -279,27 +288,32 @@ export class Server {
} }
http2Server.removeListener('error', onError); http2Server.removeListener('error', onError);
}); });
});
}) })
})).then(results => { ).then((results) => {
let count = 0; let count = 0;
for (const result of results) { for (const result of results) {
if (typeof result === 'number') { if (typeof result === 'number') {
count += 1; count += 1;
if (result !== portNum) { if (result !== portNum) {
throw new Error('Invalid state: multiple port numbers added from single address'); throw new Error(
'Invalid state: multiple port numbers added from single address'
);
} }
} }
} }
return { return {
port: portNum, port: portNum,
count: count + previousCount count: count + previousCount,
}; };
}); });
} };
const bindWildcardPort = (addressList: SubchannelAddress[]): Promise<BindResult> => { const bindWildcardPort = (
addressList: SubchannelAddress[]
): Promise<BindResult> => {
if (addressList.length === 0) { if (addressList.length === 0) {
return Promise.resolve<BindResult>({port: 0, count: 0}); return Promise.resolve<BindResult>({ port: 0, count: 0 });
} }
const address = addressList[0]; const address = addressList[0];
const http2Server = setupServer(); const http2Server = setupServer();
@ -312,16 +326,26 @@ export class Server {
http2Server.listen(address, () => { http2Server.listen(address, () => {
this.http2ServerList.push(http2Server); this.http2ServerList.push(http2Server);
resolve(bindSpecificPort(addressList.slice(1), (http2Server.address() as AddressInfo).port, 1)); resolve(
bindSpecificPort(
addressList.slice(1),
(http2Server.address() as AddressInfo).port,
1
)
);
http2Server.removeListener('error', onError); http2Server.removeListener('error', onError);
}); });
}); });
} };
const resolverListener: ResolverListener = { const resolverListener: ResolverListener = {
onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => { onSuccessfulResolution: (
addressList,
serviceConfig,
serviceConfigError
) => {
// We only want one resolution result. Discard all future results // We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {} resolverListener.onSuccessfulResolution = () => {};
if (addressList.length === 0) { if (addressList.length === 0) {
callback(new Error(`No addresses resolved for port ${port}`), 0); callback(new Error(`No addresses resolved for port ${port}`), 0);
return; return;
@ -331,32 +355,42 @@ export class Server {
if (addressList[0].port === 0) { if (addressList[0].port === 0) {
bindResultPromise = bindWildcardPort(addressList); bindResultPromise = bindWildcardPort(addressList);
} else { } else {
bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0); bindResultPromise = bindSpecificPort(
addressList,
addressList[0].port,
0
);
} }
} else{ } else {
// Use an arbitrary non-zero port for non-TCP addresses // Use an arbitrary non-zero port for non-TCP addresses
bindResultPromise = bindSpecificPort(addressList, 1, 0); bindResultPromise = bindSpecificPort(addressList, 1, 0);
} }
bindResultPromise.then(bindResult => { bindResultPromise.then(
(bindResult) => {
if (bindResult.count === 0) { if (bindResult.count === 0) {
const errorString = `No address added out of total ${addressList.length} resolved`; const errorString = `No address added out of total ${addressList.length} resolved`;
log(LogVerbosity.ERROR, errorString); log(LogVerbosity.ERROR, errorString);
callback(new Error(errorString), 0); callback(new Error(errorString), 0);
} else { } else {
if (bindResult.count < addressList.length) { if (bindResult.count < addressList.length) {
log(LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`); log(
LogVerbosity.INFO,
`WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
);
} }
callback(null, bindResult.port); callback(null, bindResult.port);
} }
}, (error) => { },
(error) => {
const errorString = `No address added out of total ${addressList.length} resolved`; const errorString = `No address added out of total ${addressList.length} resolved`;
log(LogVerbosity.ERROR, errorString); log(LogVerbosity.ERROR, errorString);
callback(new Error(errorString), 0); callback(new Error(errorString), 0);
}); }
);
}, },
onError: (error) => { onError: (error) => {
callback(new Error(error.details), 0); callback(new Error(error.details), 0);
} },
}; };
const resolver = createResolver(port, resolverListener); const resolver = createResolver(port, resolverListener);
@ -376,7 +410,7 @@ export class Server {
// Always destroy any available sessions. It's possible that one or more // Always destroy any available sessions. It's possible that one or more
// tryShutdown() calls are in progress. Don't wait on them to finish. // tryShutdown() calls are in progress. Don't wait on them to finish.
this.sessions.forEach(session => { this.sessions.forEach((session) => {
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
// recognize destroy(code) as a valid signature. // recognize destroy(code) as a valid signature.
session.destroy(http2.constants.NGHTTP2_CANCEL as any); session.destroy(http2.constants.NGHTTP2_CANCEL as any);
@ -405,7 +439,12 @@ export class Server {
} }
start(): void { start(): void {
if (this.http2ServerList.length === 0 || this.http2ServerList.every(http2Server => http2Server.listening !== true)) { if (
this.http2ServerList.length === 0 ||
this.http2ServerList.every(
(http2Server) => http2Server.listening !== true
)
) {
throw new Error('server must be bound in order to start'); throw new Error('server must be bound in order to start');
} }
@ -439,7 +478,7 @@ export class Server {
// If any sessions are active, close them gracefully. // If any sessions are active, close them gracefully.
pendingChecks += this.sessions.size; pendingChecks += this.sessions.size;
this.sessions.forEach(session => { this.sessions.forEach((session) => {
session.close(maybeCallback); session.close(maybeCallback);
}); });
if (pendingChecks === 0) { if (pendingChecks === 0) {
@ -451,7 +490,9 @@ export class Server {
throw new Error('Not yet implemented'); throw new Error('Not yet implemented');
} }
private _setupHandlers(http2Server: http2.Http2Server | http2.Http2SecureServer): void { private _setupHandlers(
http2Server: http2.Http2Server | http2.Http2SecureServer
): void {
if (http2Server === null) { if (http2Server === null) {
return; return;
} }
@ -525,7 +566,7 @@ export class Server {
} }
); );
http2Server.on('session', session => { http2Server.on('session', (session) => {
if (!this.started) { if (!this.started) {
session.destroy(); session.destroy();
return; return;

View File

@ -67,7 +67,7 @@ export class SubchannelPool {
const subchannelObjArray = this.pool[channelTarget]; const subchannelObjArray = this.pool[channelTarget];
const refedSubchannels = subchannelObjArray.filter( const refedSubchannels = subchannelObjArray.filter(
value => !value.subchannel.unrefIfOneRef() (value) => !value.subchannel.unrefIfOneRef()
); );
if (refedSubchannels.length > 0) { if (refedSubchannels.length > 0) {

View File

@ -210,7 +210,7 @@ export class Subchannel {
`grpc-node-js/${clientVersion}`, `grpc-node-js/${clientVersion}`,
options['grpc.secondary_user_agent'], options['grpc.secondary_user_agent'],
] ]
.filter(e => e) .filter((e) => e)
.join(' '); // remove falsey values first .join(' '); // remove falsey values first
if ('grpc.keepalive_time_ms' in options) { if ('grpc.keepalive_time_ms' in options) {
@ -397,7 +397,7 @@ export class Subchannel {
} }
} }
); );
session.once('error', error => { session.once('error', (error) => {
/* Do nothing here. Any error should also trigger a close event, which is /* Do nothing here. Any error should also trigger a close event, which is
* where we want to handle that. */ * where we want to handle that. */
trace( trace(
@ -410,11 +410,17 @@ export class Subchannel {
private startConnectingInternal() { private startConnectingInternal() {
if (shouldUseProxy(this.channelTarget)) { if (shouldUseProxy(this.channelTarget)) {
getProxiedConnection(this.channelTarget, this.subchannelAddress).then((socket) => { getProxiedConnection(this.channelTarget, this.subchannelAddress).then(
(socket) => {
this.createSession(socket); this.createSession(socket);
}, (reason) => { },
this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE); (reason) => {
}); this.transitionToState(
[ConnectivityState.CONNECTING],
ConnectivityState.TRANSIENT_FAILURE
);
}
);
} else { } else {
this.createSession(); this.createSession();
} }
@ -589,7 +595,7 @@ export class Subchannel {
const http2Stream = this.session!.request(headers); const http2Stream = this.session!.request(headers);
let headersString = ''; let headersString = '';
for (const header of Object.keys(headers)) { for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n' headersString += '\t\t' + header + ': ' + headers[header] + '\n';
} }
trace('Starting stream with headers\n' + headersString); trace('Starting stream with headers\n' + headersString);
callStream.attachHttp2Stream(http2Stream, this); callStream.attachHttp2Stream(http2Stream, this);