From b84d2f3b39eb2e51457e372fd5972df77281fbb6 Mon Sep 17 00:00:00 2001 From: Patrick Remy Date: Thu, 9 Apr 2020 11:54:09 +0200 Subject: [PATCH] grpc-js: run gts fix for src --- packages/grpc-js/src/call-credentials.ts | 2 +- packages/grpc-js/src/call-stream.ts | 29 ++- packages/grpc-js/src/call.ts | 12 +- packages/grpc-js/src/channel.ts | 76 +++++--- packages/grpc-js/src/client-interceptors.ts | 36 ++-- packages/grpc-js/src/client.ts | 78 +++++--- packages/grpc-js/src/filter-stack.ts | 2 +- packages/grpc-js/src/http_proxy.ts | 74 ++++++-- packages/grpc-js/src/index.ts | 13 +- .../grpc-js/src/load-balancer-pick-first.ts | 4 +- .../grpc-js/src/load-balancer-round-robin.ts | 6 +- packages/grpc-js/src/make-client.ts | 4 +- packages/grpc-js/src/metadata.ts | 14 +- packages/grpc-js/src/resolver-dns.ts | 171 ++++++++++-------- packages/grpc-js/src/server-call.ts | 6 +- packages/grpc-js/src/server.ts | 161 +++++++++++------ packages/grpc-js/src/subchannel-pool.ts | 2 +- packages/grpc-js/src/subchannel.ts | 26 ++- 18 files changed, 435 insertions(+), 281 deletions(-) diff --git a/packages/grpc-js/src/call-credentials.ts b/packages/grpc-js/src/call-credentials.ts index e38672ae..e5d22bf7 100644 --- a/packages/grpc-js/src/call-credentials.ts +++ b/packages/grpc-js/src/call-credentials.ts @@ -79,7 +79,7 @@ class ComposedCallCredentials extends CallCredentials { async generateMetadata(options: CallMetadataOptions): Promise { const base: Metadata = new Metadata(); const generated: Metadata[] = await Promise.all( - this.creds.map(cred => cred.generateMetadata(options)) + this.creds.map((cred) => cred.generateMetadata(options)) ); for (const gen of generated) { base.merge(gen); diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index ee7024c2..f40366ca 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -111,7 +111,7 @@ export class InterceptingListenerImpl implements InterceptingListener { ) {} onReceiveMetadata(metadata: Metadata): void { - this.listener.onReceiveMetadata(metadata, metadata => { + this.listener.onReceiveMetadata(metadata, (metadata) => { this.nextListener.onReceiveMetadata(metadata); }); } @@ -119,7 +119,7 @@ export class InterceptingListenerImpl implements InterceptingListener { /* If this listener processes messages asynchronously, the last message may * be reordered with respect to the status */ this.processingMessage = true; - this.listener.onReceiveMessage(message, msg => { + this.listener.onReceiveMessage(message, (msg) => { this.processingMessage = false; this.nextListener.onReceiveMessage(msg); if (this.pendingStatus) { @@ -128,7 +128,7 @@ export class InterceptingListenerImpl implements InterceptingListener { }); } onReceiveStatus(status: StatusObject): void { - this.listener.onReceiveStatus(status, processedStatus => { + this.listener.onReceiveStatus(status, (processedStatus) => { if (this.processingMessage) { this.pendingStatus = processedStatus; } else { @@ -221,7 +221,9 @@ export class Http2CallStream implements Call { /* Precondition: this.finalStatus !== null */ if (!this.statusOutput) { this.statusOutput = true; - const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus!); + const filteredStatus = this.filterStack.receiveTrailers( + this.finalStatus! + ); this.listener!.onReceiveStatus(filteredStatus); if (this.subchannel) { this.subchannel.callUnref(); @@ -352,7 +354,7 @@ export class Http2CallStream implements Call { private handleTrailers(headers: http2.IncomingHttpHeaders) { let headersString = ''; for (const header of Object.keys(headers)) { - headersString += '\t\t' + header + ': ' + headers[header] + '\n' + headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } this.trace('Received server trailers:\n' + headersString); let metadata: Metadata; @@ -363,7 +365,10 @@ export class Http2CallStream implements Call { } const metadataMap = metadata.getMap(); let code: Status = this.mappedStatusCode; - if (code === Status.UNKNOWN && typeof metadataMap['grpc-status'] === 'string') { + if ( + code === Status.UNKNOWN && + typeof metadataMap['grpc-status'] === 'string' + ) { const receivedStatus = Number(metadataMap['grpc-status']); if (receivedStatus in Status) { code = receivedStatus; @@ -375,7 +380,9 @@ export class Http2CallStream implements Call { if (typeof metadataMap['grpc-message'] === 'string') { details = decodeURI(metadataMap['grpc-message']); metadata.remove('grpc-message'); - this.trace('received status details string "' + details + '" from server'); + this.trace( + 'received status details string "' + details + '" from server' + ); } const status: StatusObject = { code, details, metadata }; let finalStatus; @@ -412,7 +419,7 @@ export class Http2CallStream implements Call { stream.on('response', (headers, flags) => { let headersString = ''; for (const header of Object.keys(headers)) { - headersString += '\t\t' + header + ': ' + headers[header] + '\n' + headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } this.trace('Received server headers:\n' + headersString); switch (headers[':status']) { @@ -575,7 +582,9 @@ export class Http2CallStream implements Call { } cancelWithStatus(status: Status, details: string): void { - this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"'); + this.trace( + 'cancelWithStatus code: ' + status + ' details: "' + details + '"' + ); this.destroyHttp2Stream(); this.endCall({ code: status, details, metadata: new Metadata() }); } @@ -650,7 +659,7 @@ export class Http2CallStream implements Call { }; const cb: WriteCallback = context.callback ?? (() => {}); this.isWriteFilterPending = true; - this.filterStack.sendMessage(Promise.resolve(writeObj)).then(message => { + this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => { this.isWriteFilterPending = false; if (this.http2Stream === null) { this.trace( diff --git a/packages/grpc-js/src/call.ts b/packages/grpc-js/src/call.ts index 61885cd6..0d88ef15 100644 --- a/packages/grpc-js/src/call.ts +++ b/packages/grpc-js/src/call.ts @@ -100,9 +100,7 @@ export class ClientUnaryCallImpl extends EventEmitter export class ClientReadableStreamImpl extends Readable implements ClientReadableStream { public call?: InterceptingCallInterface; - constructor( - readonly deserialize: (chunk: Buffer) => ResponseType - ) { + constructor(readonly deserialize: (chunk: Buffer) => ResponseType) { super({ objectMode: true }); } @@ -122,9 +120,7 @@ export class ClientReadableStreamImpl extends Readable export class ClientWritableStreamImpl extends Writable implements ClientWritableStream { public call?: InterceptingCallInterface; - constructor( - readonly serialize: (value: RequestType) => Buffer - ) { + constructor(readonly serialize: (value: RequestType) => Buffer) { super({ objectMode: true }); } @@ -140,7 +136,7 @@ export class ClientWritableStreamImpl extends Writable const context: MessageContext = { callback: cb, }; - const flags: number = Number(encoding); + const flags = Number(encoding); if (!Number.isNaN(flags)) { context.flags = flags; } @@ -179,7 +175,7 @@ export class ClientDuplexStreamImpl extends Duplex const context: MessageContext = { callback: cb, }; - const flags: number = Number(encoding); + const flags = Number(encoding); if (!Number.isNaN(flags)) { context.flags = flags; } diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 4d07dde0..a8b9f4ec 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -144,11 +144,23 @@ export class ChannelImplementation implements Channel { throw new TypeError('Channel target must be a string'); } if (!(credentials instanceof ChannelCredentials)) { - throw new TypeError('Channel credentials must be a ChannelCredentials object'); + throw new TypeError( + 'Channel credentials must be a ChannelCredentials object' + ); } if (options) { - if ((typeof options !== 'object') || !Object.values(options).every(value => typeof value === 'string' || typeof value === 'number' || typeof value === 'undefined')) { - throw new TypeError('Channel options must be an object with string or number values'); + if ( + typeof options !== 'object' || + !Object.values(options).every( + (value) => + typeof value === 'string' || + typeof value === 'number' || + typeof value === 'undefined' + ) + ) { + throw new TypeError( + 'Channel options must be an object with string or number values' + ); } } /* The global boolean parameter to getSubchannelPool has the inverse meaning to what @@ -265,7 +277,7 @@ export class ChannelImplementation implements Channel { callStream.filterStack .sendMetadata(Promise.resolve(callMetadata.clone())) .then( - finalMetadata => { + (finalMetadata) => { const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState(); if (subchannelState === ConnectivityState.READY) { try { @@ -274,28 +286,31 @@ export class ChannelImplementation implements Channel { callStream ); } catch (error) { - if ((error as NodeJS.ErrnoException).code === 'ERR_HTTP2_GOAWAY_SESSION') { + if ( + (error as NodeJS.ErrnoException).code === + 'ERR_HTTP2_GOAWAY_SESSION' + ) { /* An error here indicates that something went wrong with - * the picked subchannel's http2 stream right before we - * tried to start the stream. We are handling a promise - * result here, so this is asynchronous with respect to the - * original tryPick call, so calling it again is not - * recursive. We call tryPick immediately instead of - * queueing this pick again because handling the queue is - * triggered by state changes, and we want to immediately - * check if the state has already changed since the - * previous tryPick call. We do this instead of cancelling - * the stream because the correct behavior may be - * re-queueing instead, based on the logic in the rest of - * tryPick */ + * the picked subchannel's http2 stream right before we + * tried to start the stream. We are handling a promise + * result here, so this is asynchronous with respect to the + * original tryPick call, so calling it again is not + * recursive. We call tryPick immediately instead of + * queueing this pick again because handling the queue is + * triggered by state changes, and we want to immediately + * check if the state has already changed since the + * previous tryPick call. We do this instead of cancelling + * the stream because the correct behavior may be + * re-queueing instead, based on the logic in the rest of + * tryPick */ trace( LogVerbosity.INFO, 'channel', 'Failed to start call on picked subchannel ' + - pickResult.subchannel!.getAddress() + - ' with error ' + - (error as Error).message + - '. Retrying pick' + pickResult.subchannel!.getAddress() + + ' with error ' + + (error as Error).message + + '. Retrying pick' ); this.tryPick(callStream, callMetadata); } else { @@ -303,12 +318,15 @@ export class ChannelImplementation implements Channel { LogVerbosity.INFO, 'channel', 'Failed to start call on picked subchanel ' + - pickResult.subchannel!.getAddress() + - ' with error ' + - (error as Error).message + - '. Ending call' + pickResult.subchannel!.getAddress() + + ' with error ' + + (error as Error).message + + '. Ending call' + ); + callStream.cancelWithStatus( + Status.INTERNAL, + 'Failed to start HTTP/2 stream' ); - callStream.cancelWithStatus(Status.INTERNAL, 'Failed to start HTTP/2 stream'); } } } else { @@ -360,7 +378,7 @@ export class ChannelImplementation implements Channel { watcherObject: ConnectivityStateWatcher ) { const watcherIndex = this.connectivityStateWatchers.findIndex( - value => value === watcherObject + (value) => value === watcherObject ); if (watcherIndex >= 0) { this.connectivityStateWatchers.splice(watcherIndex, 1); @@ -450,7 +468,9 @@ export class ChannelImplementation implements Channel { throw new TypeError('Channel#createCall: method must be a string'); } if (!(typeof deadline === 'number' || deadline instanceof Date)) { - throw new TypeError('Channel#createCall: deadline must be a number or Date'); + throw new TypeError( + 'Channel#createCall: deadline must be a number or Date' + ); } if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts index c20db534..72bbdab9 100644 --- a/packages/grpc-js/src/client-interceptors.ts +++ b/packages/grpc-js/src/client-interceptors.ts @@ -179,10 +179,10 @@ const defaultRequester: FullRequester = { sendMessage: (message, next) => { next(message); }, - halfClose: next => { + halfClose: (next) => { next(); }, - cancel: next => { + cancel: (next) => { next(); }, }; @@ -250,13 +250,13 @@ export class InterceptingCall implements InterceptingCallInterface { const fullInterceptingListener: InterceptingListener = { onReceiveMetadata: interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ?? - (metadata => {}), + ((metadata) => {}), onReceiveMessage: interceptingListener?.onReceiveMessage?.bind(interceptingListener) ?? - (message => {}), + ((message) => {}), onReceiveStatus: interceptingListener?.onReceiveStatus?.bind(interceptingListener) ?? - (status => {}), + ((status) => {}), }; this.requester.start(metadata, fullInterceptingListener, (md, listener) => { let finalInterceptingListener: InterceptingListener; @@ -281,7 +281,7 @@ export class InterceptingCall implements InterceptingCallInterface { } sendMessageWithContext(context: MessageContext, message: any): void { this.processingMessage = true; - this.requester.sendMessage(message, finalMessage => { + this.requester.sendMessage(message, (finalMessage) => { this.processingMessage = false; this.nextCall.sendMessageWithContext(context, finalMessage); if (this.pendingHalfClose) { @@ -368,10 +368,10 @@ class BaseInterceptingCall implements InterceptingCallInterface { ): void { let readError: StatusObject | null = null; this.call.start(metadata, { - onReceiveMetadata: metadata => { + onReceiveMetadata: (metadata) => { interceptingListener?.onReceiveMetadata?.(metadata); }, - onReceiveMessage: message => { + onReceiveMessage: (message) => { let deserialized: any; try { deserialized = this.methodDefinition.responseDeserialize(message); @@ -385,7 +385,7 @@ class BaseInterceptingCall implements InterceptingCallInterface { this.call.cancelWithStatus(readError.code, readError.details); } }, - onReceiveStatus: status => { + onReceiveStatus: (status) => { if (readError) { interceptingListener?.onReceiveStatus?.(readError); } else { @@ -415,7 +415,7 @@ class BaseUnaryInterceptingCall extends BaseInterceptingCall let receivedMessage = false; const wrapperListener: InterceptingListener = { onReceiveMetadata: - listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}), + listener?.onReceiveMetadata?.bind(listener) ?? ((metadata) => {}), onReceiveMessage: (message: any) => { receivedMessage = true; listener?.onReceiveMessage?.(message); @@ -502,21 +502,21 @@ export function getInterceptingCall( interceptors = ([] as Interceptor[]) .concat( interceptorArgs.callInterceptors, - interceptorArgs.callInterceptorProviders.map(provider => + interceptorArgs.callInterceptorProviders.map((provider) => provider(methodDefinition) ) ) - .filter(interceptor => interceptor); + .filter((interceptor) => interceptor); // Filter out falsy values when providers return nothing } else { interceptors = ([] as Interceptor[]) .concat( interceptorArgs.clientInterceptors, - interceptorArgs.clientInterceptorProviders.map(provider => + interceptorArgs.clientInterceptorProviders.map((provider) => provider(methodDefinition) ) ) - .filter(interceptor => interceptor); + .filter((interceptor) => interceptor); // Filter out falsy values when providers return nothing } const interceptorOptions = Object.assign({}, options, { @@ -531,14 +531,10 @@ export function getInterceptingCall( * channel. */ const getCall: NextCall = interceptors.reduceRight( (nextCall: NextCall, nextInterceptor: Interceptor) => { - return currentOptions => nextInterceptor(currentOptions, nextCall); + return (currentOptions) => nextInterceptor(currentOptions, nextCall); }, (finalOptions: InterceptorOptions) => - getBottomInterceptingCall( - channel, - finalOptions, - methodDefinition - ) + getBottomInterceptingCall(channel, finalOptions, methodDefinition) ); return getCall(interceptorOptions); } diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index 118375b2..f33de1f7 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -48,7 +48,12 @@ import { InterceptorArguments, InterceptingCallInterface, } from './client-interceptors'; -import { ServerUnaryCall, ServerReadableStream, ServerWritableStream, ServerDuplexStream } from './server-call'; +import { + ServerUnaryCall, + ServerReadableStream, + ServerWritableStream, + ServerDuplexStream, +} from './server-call'; const CHANNEL_SYMBOL = Symbol(); const INTERCEPTOR_SYMBOL = Symbol(); @@ -62,7 +67,11 @@ export interface UnaryCallback { export interface CallOptions { deadline?: Deadline; host?: string; - parent?: ServerUnaryCall | ServerReadableStream | ServerWritableStream | ServerDuplexStream + parent?: + | ServerUnaryCall + | ServerReadableStream + | ServerWritableStream + | ServerDuplexStream; propagate_flags?: number; credentials?: CallCredentials; interceptors?: Interceptor[]; @@ -76,11 +85,11 @@ export interface CallProperties { channel: Channel; methodDefinition: ClientMethodDefinition; callOptions: CallOptions; - callback?: UnaryCallback + callback?: UnaryCallback; } export interface CallInvocationTransformer { - (callProperties: CallProperties): CallProperties + (callProperties: CallProperties): CallProperties; } export type ClientOptions = Partial & { @@ -123,7 +132,8 @@ export class Client { 'to the client constructor. Only one of these is allowed.' ); } - this[CALL_INVOCATION_TRANSFORMER_SYMBOL] = options.callInvocationTransformer; + this[CALL_INVOCATION_TRANSFORMER_SYMBOL] = + options.callInvocationTransformer; delete options.callInvocationTransformer; if (options.channelOverride) { this[CHANNEL_SYMBOL] = options.channelOverride; @@ -274,17 +284,20 @@ export class Client { channel: this[CHANNEL_SYMBOL], methodDefinition: methodDefinition, callOptions: checkedArguments.options, - callback: checkedArguments.callback + callback: checkedArguments.callback, }; if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) { - callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties; + callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!( + callProperties + ) as CallProperties; } const emitter: ClientUnaryCall = callProperties.call; const interceptorArgs: InterceptorArguments = { clientInterceptors: this[INTERCEPTOR_SYMBOL], clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], callInterceptors: callProperties.callOptions.interceptors ?? [], - callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [], + callInterceptorProviders: + callProperties.callOptions.interceptor_providers ?? [], }; const call: InterceptingCallInterface = getInterceptingCall( interceptorArgs, @@ -303,7 +316,7 @@ export class Client { let responseMessage: ResponseType | null = null; let receivedStatus = false; call.start(callProperties.metadata, { - onReceiveMetadata: metadata => { + onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); }, onReceiveMessage(message: any) { @@ -385,17 +398,22 @@ export class Client { channel: this[CHANNEL_SYMBOL], methodDefinition: methodDefinition, callOptions: checkedArguments.options, - callback: checkedArguments.callback + callback: checkedArguments.callback, }; if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) { - callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties; + callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!( + callProperties + ) as CallProperties; } - const emitter: ClientWritableStream = callProperties.call as ClientWritableStream; + const emitter: ClientWritableStream = callProperties.call as ClientWritableStream< + RequestType + >; const interceptorArgs: InterceptorArguments = { clientInterceptors: this[INTERCEPTOR_SYMBOL], clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], callInterceptors: callProperties.callOptions.interceptors ?? [], - callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [], + callInterceptorProviders: + callProperties.callOptions.interceptor_providers ?? [], }; const call: InterceptingCallInterface = getInterceptingCall( interceptorArgs, @@ -414,7 +432,7 @@ export class Client { let responseMessage: ResponseType | null = null; let receivedStatus = false; call.start(callProperties.metadata, { - onReceiveMetadata: metadata => { + onReceiveMetadata: (metadata) => { emitter.emit('metadata', metadata); }, onReceiveMessage(message: any) { @@ -503,17 +521,22 @@ export class Client { call: new ClientReadableStreamImpl(deserialize), channel: this[CHANNEL_SYMBOL], methodDefinition: methodDefinition, - callOptions: checkedArguments.options + callOptions: checkedArguments.options, }; if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) { - callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties; + callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!( + callProperties + ) as CallProperties; } - const stream: ClientReadableStream = callProperties.call as ClientReadableStream; + const stream: ClientReadableStream = callProperties.call as ClientReadableStream< + ResponseType + >; const interceptorArgs: InterceptorArguments = { clientInterceptors: this[INTERCEPTOR_SYMBOL], clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], callInterceptors: callProperties.callOptions.interceptors ?? [], - callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [], + callInterceptorProviders: + callProperties.callOptions.interceptor_providers ?? [], }; const call: InterceptingCallInterface = getInterceptingCall( interceptorArgs, @@ -589,20 +612,29 @@ export class Client { }; let callProperties: CallProperties = { metadata: checkedArguments.metadata, - call: new ClientDuplexStreamImpl(serialize, deserialize), + call: new ClientDuplexStreamImpl( + serialize, + deserialize + ), channel: this[CHANNEL_SYMBOL], methodDefinition: methodDefinition, - callOptions: checkedArguments.options + callOptions: checkedArguments.options, }; if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) { - callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties; + callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!( + callProperties + ) as CallProperties; } - const stream: ClientDuplexStream = callProperties.call as ClientDuplexStream; + const stream: ClientDuplexStream< + RequestType, + ResponseType + > = callProperties.call as ClientDuplexStream; const interceptorArgs: InterceptorArguments = { clientInterceptors: this[INTERCEPTOR_SYMBOL], clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL], callInterceptors: callProperties.callOptions.interceptors ?? [], - callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [], + callInterceptorProviders: + callProperties.callOptions.interceptor_providers ?? [], }; const call: InterceptingCallInterface = getInterceptingCall( interceptorArgs, diff --git a/packages/grpc-js/src/filter-stack.ts b/packages/grpc-js/src/filter-stack.ts index 3b6f16ba..a656a409 100644 --- a/packages/grpc-js/src/filter-stack.ts +++ b/packages/grpc-js/src/filter-stack.ts @@ -78,7 +78,7 @@ export class FilterStackFactory implements FilterFactory { createFilter(callStream: Call): FilterStack { return new FilterStack( - this.factories.map(factory => factory.createFilter(callStream)) + this.factories.map((factory) => factory.createFilter(callStream)) ); } } diff --git a/packages/grpc-js/src/http_proxy.ts b/packages/grpc-js/src/http_proxy.ts index ecff1bb5..ef52deda 100644 --- a/packages/grpc-js/src/http_proxy.ts +++ b/packages/grpc-js/src/http_proxy.ts @@ -15,14 +15,18 @@ * */ -import { URL, parse } from "url"; -import { log } from "./logging"; -import { LogVerbosity } from "./constants"; -import { parseTarget } from "./resolver-dns"; -import { Socket } from "net"; +import { URL, parse } from 'url'; +import { log } from './logging'; +import { LogVerbosity } from './constants'; +import { parseTarget } from './resolver-dns'; +import { Socket } from 'net'; import * as http from 'http'; import * as logging from './logging'; -import { SubchannelAddress, TcpSubchannelAddress, isTcpSubchannelAddress } from "./subchannel"; +import { + SubchannelAddress, + TcpSubchannelAddress, + isTcpSubchannelAddress, +} from './subchannel'; const TRACER_NAME = 'proxy'; @@ -36,8 +40,8 @@ interface ProxyInfo { } function getProxyInfo(): ProxyInfo { - let proxyEnv: string = ''; - let envVar: string = ''; + let proxyEnv = ''; + let envVar = ''; /* Prefer using 'grpc_proxy'. Fallback on 'http_proxy' if it is not set. * Also prefer using 'https_proxy' with fallback on 'http_proxy'. The * fallback behavior can be removed if there's a demand for it. @@ -62,7 +66,10 @@ function getProxyInfo(): ProxyInfo { return {}; } if (proxyUrl.protocol !== 'http:') { - log(LogVerbosity.ERROR, `"${proxyUrl.protocol}" scheme not supported in proxy URI`); + log( + LogVerbosity.ERROR, + `"${proxyUrl.protocol}" scheme not supported in proxy URI` + ); return {}; } let userCred: string | null = null; @@ -75,12 +82,14 @@ function getProxyInfo(): ProxyInfo { } } const result: ProxyInfo = { - address: proxyUrl.host + address: proxyUrl.host, }; if (userCred) { result.creds = userCred; } - trace('Proxy server ' + result.address + ' set by environment variable ' + envVar); + trace( + 'Proxy server ' + result.address + ' set by environment variable ' + envVar + ); return result; } @@ -89,7 +98,7 @@ const PROXY_INFO = getProxyInfo(); function getNoProxyHostList(): string[] { /* Prefer using 'no_grpc_proxy'. Fallback on 'no_proxy' if it is not set. */ let noProxyStr: string | undefined = process.env.no_grpc_proxy; - let envVar: string = 'no_grpc_proxy'; + let envVar = 'no_grpc_proxy'; if (!noProxyStr) { noProxyStr = process.env.no_proxy; envVar = 'no_proxy'; @@ -124,20 +133,37 @@ export function shouldUseProxy(target: string): boolean { return true; } -export function getProxiedConnection(target: string, subchannelAddress: SubchannelAddress): Promise { - if (!(PROXY_INFO.address && shouldUseProxy(target) && isTcpSubchannelAddress(subchannelAddress))) { +export function getProxiedConnection( + target: string, + subchannelAddress: SubchannelAddress +): Promise { + if ( + !( + PROXY_INFO.address && + shouldUseProxy(target) && + isTcpSubchannelAddress(subchannelAddress) + ) + ) { return Promise.reject(); } const subchannelAddressPathString = `${subchannelAddress.host}:${subchannelAddress.port}`; - trace('Using proxy ' + PROXY_INFO.address + ' to connect to ' + target + ' at ' + subchannelAddress); + trace( + 'Using proxy ' + + PROXY_INFO.address + + ' to connect to ' + + target + + ' at ' + + subchannelAddress + ); const options: http.RequestOptions = { method: 'CONNECT', host: PROXY_INFO.address, - path: subchannelAddressPathString + path: subchannelAddressPathString, }; if (PROXY_INFO.creds) { options.headers = { - 'Proxy-Authorization': 'Basic ' + Buffer.from(PROXY_INFO.creds).toString('base64') + 'Proxy-Authorization': + 'Basic ' + Buffer.from(PROXY_INFO.creds).toString('base64'), }; } return new Promise((resolve, reject) => { @@ -146,10 +172,20 @@ export function getProxiedConnection(target: string, subchannelAddress: Subchann request.removeAllListeners(); socket.removeAllListeners(); if (res.statusCode === http.STATUS_CODES.OK) { - trace('Successfully connected to ' + subchannelAddress + ' through proxy ' + PROXY_INFO.address); + trace( + 'Successfully connected to ' + + subchannelAddress + + ' through proxy ' + + PROXY_INFO.address + ); resolve(socket); } else { - trace('Failed to connect to ' + subchannelAddress + ' through proxy ' + PROXY_INFO.address); + trace( + 'Failed to connect to ' + + subchannelAddress + + ' through proxy ' + + PROXY_INFO.address + ); reject(); } }); diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index 5c77a3ff..8324ff4c 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -28,7 +28,12 @@ import { CallCredentials } from './call-credentials'; import { Deadline, StatusObject } from './call-stream'; import { Channel, ConnectivityState, ChannelImplementation } from './channel'; import { ChannelCredentials } from './channel-credentials'; -import { CallOptions, Client, CallInvocationTransformer, CallProperties } from './client'; +import { + CallOptions, + Client, + CallInvocationTransformer, + CallProperties, +} from './client'; import { LogVerbosity, Status } from './constants'; import * as logging from './logging'; import { @@ -129,14 +134,14 @@ export const credentials = mixin( }); } getHeaders.then( - headers => { + (headers) => { const metadata = new Metadata(); for (const key of Object.keys(headers)) { metadata.add(key, headers[key]); } callback(null, metadata); }, - err => { + (err) => { callback(err); } ); @@ -202,7 +207,7 @@ export { CallProperties, CallInvocationTransformer, ChannelImplementation as Channel, - Channel as ChannelInterface + Channel as ChannelInterface, }; /** diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index e0e1768c..6b9756ff 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -338,11 +338,11 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.resetSubchannelList(); trace( 'Connect to address list ' + - this.latestAddressList.map(address => + this.latestAddressList.map((address) => subchannelAddressToString(address) ) ); - this.subchannels = this.latestAddressList.map(address => + this.subchannels = this.latestAddressList.map((address) => this.channelControlHelper.createSubchannel(address, {}) ); for (const subchannel of this.subchannels) { diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index 863fec77..93c64610 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -125,7 +125,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer { private calculateAndUpdateState() { if (this.subchannelStateCounts[ConnectivityState.READY] > 0) { const readySubchannels = this.subchannels.filter( - subchannel => + (subchannel) => subchannel.getConnectivityState() === ConnectivityState.READY ); let index = 0; @@ -192,9 +192,9 @@ export class RoundRobinLoadBalancer implements LoadBalancer { this.resetSubchannelList(); trace( 'Connect to address list ' + - addressList.map(address => subchannelAddressToString(address)) + addressList.map((address) => subchannelAddressToString(address)) ); - this.subchannels = addressList.map(address => + this.subchannels = addressList.map((address) => this.channelControlHelper.createSubchannel(address, {}) ); for (const subchannel of this.subchannels) { diff --git a/packages/grpc-js/src/make-client.ts b/packages/grpc-js/src/make-client.ts index f55f4917..99daef15 100644 --- a/packages/grpc-js/src/make-client.ts +++ b/packages/grpc-js/src/make-client.ts @@ -116,7 +116,7 @@ export function makeClientConstructor( [methodName: string]: Function; } - Object.keys(methods).forEach(name => { + Object.keys(methods).forEach((name) => { const attrs = methods[name]; let methodType: keyof typeof requesterFuncs; // TODO(murgatroid99): Verify that we don't need this anymore @@ -164,7 +164,7 @@ function partial( serialize: Function, deserialize: Function ): Function { - return function(this: any, ...args: any[]) { + return function (this: any, ...args: any[]) { return fn.call(this, path, serialize, deserialize, ...args); }; } diff --git a/packages/grpc-js/src/metadata.ts b/packages/grpc-js/src/metadata.ts index 630fb99a..0ffe0af2 100644 --- a/packages/grpc-js/src/metadata.ts +++ b/packages/grpc-js/src/metadata.ts @@ -178,7 +178,7 @@ export class Metadata { const newInternalRepr = newMetadata.internalRepr; this.internalRepr.forEach((value, key) => { - const clonedValue: MetadataValue[] = value.map(v => { + const clonedValue: MetadataValue[] = value.map((v) => { if (v instanceof Buffer) { return Buffer.from(v); } else { @@ -226,7 +226,7 @@ export class Metadata { this.internalRepr.forEach((values, key) => { // We assume that the user's interaction with this object is limited to // through its public API (i.e. keys and values are already validated). - result[key] = values.map(value => { + result[key] = values.map((value) => { if (value instanceof Buffer) { return value.toString('base64'); } else { @@ -249,7 +249,7 @@ export class Metadata { */ static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata { const result = new Metadata(); - Object.keys(headers).forEach(key => { + Object.keys(headers).forEach((key) => { // Reserved headers (beginning with `:`) are not valid keys. if (key.charAt(0) === ':') { return; @@ -260,12 +260,12 @@ export class Metadata { try { if (isBinaryKey(key)) { if (Array.isArray(values)) { - values.forEach(value => { + values.forEach((value) => { result.add(key, Buffer.from(value, 'base64')); }); } else if (values !== undefined) { if (isCustomMetadata(key)) { - values.split(',').forEach(v => { + values.split(',').forEach((v) => { result.add(key, Buffer.from(v.trim(), 'base64')); }); } else { @@ -274,12 +274,12 @@ export class Metadata { } } else { if (Array.isArray(values)) { - values.forEach(value => { + values.forEach((value) => { result.add(key, value); }); } else if (values !== undefined) { if (isCustomMetadata(key)) { - values.split(',').forEach(v => result.add(key, v.trim())); + values.split(',').forEach((v) => result.add(key, v.trim())); } else { result.add(key, values); } diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 82727a42..a42a898f 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -109,7 +109,7 @@ function mergeArrays(...arrays: T[][]): T[] { i < Math.max.apply( null, - arrays.map(array => array.length) + arrays.map((array) => array.length) ); i++ ) { @@ -186,50 +186,56 @@ class DnsResolver implements Resolver { * if the name exists but there are no records for that family, and that * error is indistinguishable from other kinds of errors */ this.pendingLookupPromise = dnsLookupPromise(hostname, { all: true }); - this.pendingLookupPromise.then(addressList => { - this.pendingLookupPromise = null; - const ip4Addresses: dns.LookupAddress[] = addressList.filter( - addr => addr.family === 4 - ); - const ip6Addresses: dns.LookupAddress[] = addressList.filter(addr => addr.family === 6); - this.latestLookupResult = mergeArrays( - ip6Addresses, - ip4Addresses - ).map(addr => ({ host: addr.address, port: +this.port! })); - const allAddressesString: string = - '[' + - this.latestLookupResult.map(addr => addr.host + ':' + addr.port).join(',') + - ']'; - trace( - 'Resolved addresses for target ' + - this.target + - ': ' + - allAddressesString - ); - if (this.latestLookupResult.length === 0) { + this.pendingLookupPromise.then( + (addressList) => { + this.pendingLookupPromise = null; + const ip4Addresses: dns.LookupAddress[] = addressList.filter( + (addr) => addr.family === 4 + ); + const ip6Addresses: dns.LookupAddress[] = addressList.filter( + (addr) => addr.family === 6 + ); + this.latestLookupResult = mergeArrays( + ip6Addresses, + ip4Addresses + ).map((addr) => ({ host: addr.address, port: +this.port! })); + const allAddressesString: string = + '[' + + this.latestLookupResult + .map((addr) => addr.host + ':' + addr.port) + .join(',') + + ']'; + trace( + 'Resolved addresses for target ' + + this.target + + ': ' + + allAddressesString + ); + if (this.latestLookupResult.length === 0) { + this.listener.onError(this.defaultResolutionError); + return; + } + /* If the TXT lookup has not yet finished, both of the last two + * arguments will be null, which is the equivalent of getting an + * empty TXT response. When the TXT lookup does finish, its handler + * can update the service config by using the same address list */ + this.listener.onSuccessfulResolution( + this.latestLookupResult, + this.latestServiceConfig, + this.latestServiceConfigError + ); + }, + (err) => { + trace( + 'Resolution error for target ' + + this.target + + ': ' + + (err as Error).message + ); + this.pendingLookupPromise = null; this.listener.onError(this.defaultResolutionError); - return; } - /* If the TXT lookup has not yet finished, both of the last two - * arguments will be null, which is the equivalent of getting an - * empty TXT response. When the TXT lookup does finish, its handler - * can update the service config by using the same address list */ - this.listener.onSuccessfulResolution( - this.latestLookupResult, - this.latestServiceConfig, - this.latestServiceConfigError - ); - }, - err => { - trace( - 'Resolution error for target ' + - this.target + - ': ' + - (err as Error).message - ); - this.pendingLookupPromise = null; - this.listener.onError(this.defaultResolutionError); - }); + ); /* If there already is a still-pending TXT resolution, we can just use * that result when it comes in */ if (this.pendingTxtPromise === null) { @@ -237,45 +243,48 @@ class DnsResolver implements Resolver { * the name resolution attempt as a whole is a success even if the TXT * lookup fails */ this.pendingTxtPromise = resolveTxtPromise(hostname); - this.pendingTxtPromise.then(txtRecord => { - this.pendingTxtPromise = null; - try { - this.latestServiceConfig = extractAndSelectServiceConfig( - txtRecord, - this.percentage - ); - } catch (err) { + this.pendingTxtPromise.then( + (txtRecord) => { + this.pendingTxtPromise = null; + try { + this.latestServiceConfig = extractAndSelectServiceConfig( + txtRecord, + this.percentage + ); + } catch (err) { + this.latestServiceConfigError = { + code: Status.UNAVAILABLE, + details: 'Parsing service config failed', + metadata: new Metadata(), + }; + } + if (this.latestLookupResult !== null) { + /* We rely here on the assumption that calling this function with + * identical parameters will be essentialy idempotent, and calling + * it with the same address list and a different service config + * should result in a fast and seamless switchover. */ + this.listener.onSuccessfulResolution( + this.latestLookupResult, + this.latestServiceConfig, + this.latestServiceConfigError + ); + } + }, + (err) => { this.latestServiceConfigError = { code: Status.UNAVAILABLE, - details: 'Parsing service config failed', + details: 'TXT query failed', metadata: new Metadata(), }; + if (this.latestLookupResult !== null) { + this.listener.onSuccessfulResolution( + this.latestLookupResult, + this.latestServiceConfig, + this.latestServiceConfigError + ); + } } - if (this.latestLookupResult !== null) { - /* We rely here on the assumption that calling this function with - * identical parameters will be essentialy idempotent, and calling - * it with the same address list and a different service config - * should result in a fast and seamless switchover. */ - this.listener.onSuccessfulResolution( - this.latestLookupResult, - this.latestServiceConfig, - this.latestServiceConfigError - ) - } - }, err => { - this.latestServiceConfigError = { - code: Status.UNAVAILABLE, - details: 'TXT query failed', - metadata: new Metadata(), - }; - if (this.latestLookupResult !== null) { - this.listener.onSuccessfulResolution( - this.latestLookupResult, - this.latestServiceConfig, - this.latestServiceConfigError - ) - } - }); + ); } } } @@ -329,11 +338,15 @@ export interface dnsUrl { } export function parseTarget(target: string): dnsUrl | null { - const match = IPV4_REGEX.exec(target) ?? IPV6_REGEX.exec(target) ?? IPV6_BRACKET_REGEX.exec(target) ?? DNS_REGEX.exec(target) + const match = + IPV4_REGEX.exec(target) ?? + IPV6_REGEX.exec(target) ?? + IPV6_BRACKET_REGEX.exec(target) ?? + DNS_REGEX.exec(target); if (match) { return { host: match[1], - port: match[2] ?? undefined + port: match[2] ?? undefined, }; } else { return null; diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index d6b55ae8..240d8b53 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -157,7 +157,7 @@ export class ServerWritableStreamImpl this.trailingMetadata = new Metadata(); this.call.setupSurfaceCall(this); - this.on('error', err => { + this.on('error', (err) => { this.call.sendError(err); this.end(); }); @@ -226,7 +226,7 @@ export class ServerDuplexStreamImpl extends Duplex this.call.setupSurfaceCall(this); this.call.setupReadable(this); - this.on('error', err => { + this.on('error', (err) => { this.call.sendError(err); this.end(); }); @@ -562,7 +562,7 @@ export class Http2ServerCallStream< } setupSurfaceCall(call: ServerSurfaceCall) { - this.once('cancelled', reason => { + this.once('cancelled', (reason) => { call.cancelled = true; call.emit('cancelled', reason); }); diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index c37f69f7..1d851f1c 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -48,7 +48,11 @@ import { ServerCredentials } from './server-credentials'; import { ChannelOptions } from './channel-options'; import { createResolver, ResolverListener } from './resolver'; import { log } from './logging'; -import { SubchannelAddress, TcpSubchannelAddress, isTcpSubchannelAddress } from './subchannel'; +import { + SubchannelAddress, + TcpSubchannelAddress, + isTcpSubchannelAddress, +} from './subchannel'; interface BindResult { port: number; @@ -152,7 +156,7 @@ export class Server { throw new Error('Cannot add an empty service to a server'); } - serviceKeys.forEach(name => { + serviceKeys.forEach((name) => { const attrs = service[name]; let methodType: HandlerType; @@ -244,62 +248,72 @@ export class Server { http2Server.setTimeout(0, noop); this._setupHandlers(http2Server); return http2Server; - } + }; - const bindSpecificPort = (addressList: SubchannelAddress[], portNum: number, previousCount: number): Promise => { + const bindSpecificPort = ( + addressList: SubchannelAddress[], + portNum: number, + previousCount: number + ): Promise => { if (addressList.length === 0) { - return Promise.resolve({port: portNum, count: previousCount}); + return Promise.resolve({ port: portNum, count: previousCount }); } - return Promise.all(addressList.map(address => { - let addr: SubchannelAddress; - if (isTcpSubchannelAddress(address)) { - addr = { - host: (address as TcpSubchannelAddress).host, - port: portNum - }; - } else { - addr = address - } - - const http2Server = setupServer(); - return new Promise((resolve, reject) => { - function onError(err: Error): void { - resolve(err); + return Promise.all( + addressList.map((address) => { + let addr: SubchannelAddress; + if (isTcpSubchannelAddress(address)) { + addr = { + host: (address as TcpSubchannelAddress).host, + port: portNum, + }; + } else { + addr = address; } - http2Server.once('error', onError); - - http2Server.listen(addr, () => { - this.http2ServerList.push(http2Server); - const boundAddress = http2Server.address()!; - if (typeof boundAddress === 'string') { - resolve(portNum); - } else { - resolve(boundAddress.port); + const http2Server = setupServer(); + return new Promise((resolve, reject) => { + function onError(err: Error): void { + resolve(err); } - http2Server.removeListener('error', onError); + + http2Server.once('error', onError); + + http2Server.listen(addr, () => { + this.http2ServerList.push(http2Server); + const boundAddress = http2Server.address()!; + if (typeof boundAddress === 'string') { + resolve(portNum); + } else { + resolve(boundAddress.port); + } + http2Server.removeListener('error', onError); + }); }); }) - })).then(results => { + ).then((results) => { let count = 0; for (const result of results) { if (typeof result === 'number') { count += 1; if (result !== portNum) { - throw new Error('Invalid state: multiple port numbers added from single address'); + throw new Error( + 'Invalid state: multiple port numbers added from single address' + ); } } } return { port: portNum, - count: count + previousCount + count: count + previousCount, }; }); - } + }; - const bindWildcardPort = (addressList: SubchannelAddress[]): Promise => { + const bindWildcardPort = ( + addressList: SubchannelAddress[] + ): Promise => { if (addressList.length === 0) { - return Promise.resolve({port: 0, count: 0}); + return Promise.resolve({ port: 0, count: 0 }); } const address = addressList[0]; const http2Server = setupServer(); @@ -312,16 +326,26 @@ export class Server { http2Server.listen(address, () => { this.http2ServerList.push(http2Server); - resolve(bindSpecificPort(addressList.slice(1), (http2Server.address() as AddressInfo).port, 1)); + resolve( + bindSpecificPort( + addressList.slice(1), + (http2Server.address() as AddressInfo).port, + 1 + ) + ); http2Server.removeListener('error', onError); }); }); - } + }; const resolverListener: ResolverListener = { - onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => { + onSuccessfulResolution: ( + addressList, + serviceConfig, + serviceConfigError + ) => { // We only want one resolution result. Discard all future results - resolverListener.onSuccessfulResolution = () => {} + resolverListener.onSuccessfulResolution = () => {}; if (addressList.length === 0) { callback(new Error(`No addresses resolved for port ${port}`), 0); return; @@ -331,32 +355,42 @@ export class Server { if (addressList[0].port === 0) { bindResultPromise = bindWildcardPort(addressList); } else { - bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0); + bindResultPromise = bindSpecificPort( + addressList, + addressList[0].port, + 0 + ); } - } else{ + } else { // Use an arbitrary non-zero port for non-TCP addresses bindResultPromise = bindSpecificPort(addressList, 1, 0); } - bindResultPromise.then(bindResult => { - if (bindResult.count === 0) { + bindResultPromise.then( + (bindResult) => { + if (bindResult.count === 0) { + const errorString = `No address added out of total ${addressList.length} resolved`; + log(LogVerbosity.ERROR, errorString); + callback(new Error(errorString), 0); + } else { + if (bindResult.count < addressList.length) { + log( + LogVerbosity.INFO, + `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved` + ); + } + callback(null, bindResult.port); + } + }, + (error) => { const errorString = `No address added out of total ${addressList.length} resolved`; log(LogVerbosity.ERROR, errorString); callback(new Error(errorString), 0); - } else { - if (bindResult.count < addressList.length) { - log(LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`); - } - callback(null, bindResult.port); } - }, (error) => { - const errorString = `No address added out of total ${addressList.length} resolved`; - log(LogVerbosity.ERROR, errorString); - callback(new Error(errorString), 0); - }); + ); }, onError: (error) => { callback(new Error(error.details), 0); - } + }, }; const resolver = createResolver(port, resolverListener); @@ -376,7 +410,7 @@ export class Server { // Always destroy any available sessions. It's possible that one or more // tryShutdown() calls are in progress. Don't wait on them to finish. - this.sessions.forEach(session => { + this.sessions.forEach((session) => { // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to // recognize destroy(code) as a valid signature. session.destroy(http2.constants.NGHTTP2_CANCEL as any); @@ -405,7 +439,12 @@ export class Server { } start(): void { - if (this.http2ServerList.length === 0 || this.http2ServerList.every(http2Server => http2Server.listening !== true)) { + if ( + this.http2ServerList.length === 0 || + this.http2ServerList.every( + (http2Server) => http2Server.listening !== true + ) + ) { throw new Error('server must be bound in order to start'); } @@ -439,7 +478,7 @@ export class Server { // If any sessions are active, close them gracefully. pendingChecks += this.sessions.size; - this.sessions.forEach(session => { + this.sessions.forEach((session) => { session.close(maybeCallback); }); if (pendingChecks === 0) { @@ -451,7 +490,9 @@ export class Server { throw new Error('Not yet implemented'); } - private _setupHandlers(http2Server: http2.Http2Server | http2.Http2SecureServer): void { + private _setupHandlers( + http2Server: http2.Http2Server | http2.Http2SecureServer + ): void { if (http2Server === null) { return; } @@ -525,7 +566,7 @@ export class Server { } ); - http2Server.on('session', session => { + http2Server.on('session', (session) => { if (!this.started) { session.destroy(); return; diff --git a/packages/grpc-js/src/subchannel-pool.ts b/packages/grpc-js/src/subchannel-pool.ts index 634d2575..9f143729 100644 --- a/packages/grpc-js/src/subchannel-pool.ts +++ b/packages/grpc-js/src/subchannel-pool.ts @@ -67,7 +67,7 @@ export class SubchannelPool { const subchannelObjArray = this.pool[channelTarget]; const refedSubchannels = subchannelObjArray.filter( - value => !value.subchannel.unrefIfOneRef() + (value) => !value.subchannel.unrefIfOneRef() ); if (refedSubchannels.length > 0) { diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 64799a49..c21e3305 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -210,7 +210,7 @@ export class Subchannel { `grpc-node-js/${clientVersion}`, options['grpc.secondary_user_agent'], ] - .filter(e => e) + .filter((e) => e) .join(' '); // remove falsey values first if ('grpc.keepalive_time_ms' in options) { @@ -311,8 +311,8 @@ export class Subchannel { return socket; } else { /* net.NetConnectOpts is declared in a way that is more restrictive - * than what net.connect will actually accept, so we use the type - * assertion to work around that. */ + * than what net.connect will actually accept, so we use the type + * assertion to work around that. */ return net.connect(this.subchannelAddress); } }; @@ -397,7 +397,7 @@ export class Subchannel { } } ); - session.once('error', error => { + session.once('error', (error) => { /* Do nothing here. Any error should also trigger a close event, which is * where we want to handle that. */ trace( @@ -410,11 +410,17 @@ export class Subchannel { private startConnectingInternal() { if (shouldUseProxy(this.channelTarget)) { - getProxiedConnection(this.channelTarget, this.subchannelAddress).then((socket) => { - this.createSession(socket); - }, (reason) => { - this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE); - }); + getProxiedConnection(this.channelTarget, this.subchannelAddress).then( + (socket) => { + this.createSession(socket); + }, + (reason) => { + this.transitionToState( + [ConnectivityState.CONNECTING], + ConnectivityState.TRANSIENT_FAILURE + ); + } + ); } else { this.createSession(); } @@ -589,7 +595,7 @@ export class Subchannel { const http2Stream = this.session!.request(headers); let headersString = ''; for (const header of Object.keys(headers)) { - headersString += '\t\t' + header + ': ' + headers[header] + '\n' + headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } trace('Starting stream with headers\n' + headersString); callStream.attachHttp2Stream(http2Stream, this);