From 6c2bc599e55dc5b374603c2e5527b0165064b693 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 27 Feb 2024 12:51:38 -0800 Subject: [PATCH] grpc-js: Run code formatter, fix one lint error --- packages/grpc-js/src/backoff-timeout.ts | 4 +- packages/grpc-js/src/index.ts | 11 +- packages/grpc-js/src/internal-channel.ts | 8 +- .../grpc-js/src/load-balancer-pick-first.ts | 23 +- .../grpc-js/src/load-balancer-round-robin.ts | 4 +- packages/grpc-js/src/load-balancing-call.ts | 4 +- packages/grpc-js/src/logging.ts | 13 +- packages/grpc-js/src/resolver-dns.ts | 9 +- .../grpc-js/src/resolving-load-balancer.ts | 7 +- packages/grpc-js/src/server-call.ts | 15 +- packages/grpc-js/src/server-interceptors.ts | 186 +++++++---- packages/grpc-js/src/server.ts | 305 +++++++++++------- packages/grpc-js/src/service-config.ts | 15 +- packages/grpc-js/src/transport.ts | 7 +- packages/grpc-js/test/common.ts | 5 +- packages/grpc-js/test/test-confg-parsing.ts | 89 ++--- packages/grpc-js/test/test-pick-first.ts | 73 +++-- .../grpc-js/test/test-server-interceptors.ts | 135 +++++--- packages/grpc-js/test/test-server.ts | 139 +++++--- 19 files changed, 698 insertions(+), 354 deletions(-) diff --git a/packages/grpc-js/src/backoff-timeout.ts b/packages/grpc-js/src/backoff-timeout.ts index 78318d1e..10d347e7 100644 --- a/packages/grpc-js/src/backoff-timeout.ts +++ b/packages/grpc-js/src/backoff-timeout.ts @@ -106,7 +106,9 @@ export class BackoffTimeout { private runTimer(delay: number) { this.endTime = this.startTime; - this.endTime.setMilliseconds(this.endTime.getMilliseconds() + this.nextDelay); + this.endTime.setMilliseconds( + this.endTime.getMilliseconds() + this.nextDelay + ); clearTimeout(this.timerId); this.timerId = setTimeout(() => { this.callback(); diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index b37f6110..c766a371 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -183,7 +183,7 @@ export { ServiceDefinition, UntypedHandleCall, UntypedServiceImplementation, - VerifyOptions + VerifyOptions, }; /**** Server ****/ @@ -263,7 +263,12 @@ export { getChannelzServiceDefinition, getChannelzHandlers } from './channelz'; export { addAdminServicesToServer } from './admin'; -export { ServiceConfig, LoadBalancingConfig, MethodConfig, RetryPolicy } from './service-config'; +export { + ServiceConfig, + LoadBalancingConfig, + MethodConfig, + RetryPolicy, +} from './service-config'; export { ServerListener, @@ -274,7 +279,7 @@ export { ResponderBuilder, ServerInterceptingCallInterface, ServerInterceptingCall, - ServerInterceptor + ServerInterceptor, } from './server-interceptors'; import * as experimental from './experimental'; diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index be140522..823c935a 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -583,7 +583,8 @@ export class InternalChannel { return; } const now = new Date(); - const timeSinceLastActivity = now.valueOf() - this.lastActivityTimestamp.valueOf(); + const timeSinceLastActivity = + now.valueOf() - this.lastActivityTimestamp.valueOf(); if (timeSinceLastActivity >= this.idleTimeoutMs) { this.trace( 'Idle timer triggered after ' + @@ -603,7 +604,10 @@ export class InternalChannel { } private maybeStartIdleTimer() { - if (this.connectivityState !== ConnectivityState.SHUTDOWN && !this.idleTimer) { + if ( + this.connectivityState !== ConnectivityState.SHUTDOWN && + !this.idleTimer + ) { this.startIdleTimeout(this.idleTimeoutMs); } } diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 02796fea..29bbfbf0 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -198,7 +198,12 @@ export class PickFirstLoadBalancer implements LoadBalancer { keepaliveTime, errorMessage ) => { - this.onSubchannelStateUpdate(subchannel, previousState, newState, errorMessage); + this.onSubchannelStateUpdate( + subchannel, + previousState, + newState, + errorMessage + ); }; private pickedSubchannelHealthListener: HealthListener = () => @@ -275,7 +280,9 @@ export class PickFirstLoadBalancer implements LoadBalancer { if (this.stickyTransientFailureMode) { this.updateState( ConnectivityState.TRANSIENT_FAILURE, - new UnavailablePicker({details: `No connection established. Last error: ${this.lastError}`}) + new UnavailablePicker({ + details: `No connection established. Last error: ${this.lastError}`, + }) ); } else { this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); @@ -441,7 +448,12 @@ export class PickFirstLoadBalancer implements LoadBalancer { private resetSubchannelList() { for (const child of this.children) { - if (!(this.currentPick && child.subchannel.realSubchannelEquals(this.currentPick))) { + if ( + !( + this.currentPick && + child.subchannel.realSubchannelEquals(this.currentPick) + ) + ) { /* The connectivity state listener is the same whether the subchannel * is in the list of children or it is the currentPick, so if it is in * both, removing it here would cause problems. In particular, that @@ -523,7 +535,10 @@ export class PickFirstLoadBalancer implements LoadBalancer { } exitIdle() { - if (this.currentState === ConnectivityState.IDLE && this.latestAddressList) { + if ( + this.currentState === ConnectivityState.IDLE && + this.latestAddressList + ) { this.connectToAddressList(this.latestAddressList); } } diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index 7c38569e..7e70c554 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -156,7 +156,9 @@ export class RoundRobinLoadBalancer implements LoadBalancer { ) { this.updateState( ConnectivityState.TRANSIENT_FAILURE, - new UnavailablePicker({details: `No connection established. Last error: ${this.lastError}`}) + new UnavailablePicker({ + details: `No connection established. Last error: ${this.lastError}`, + }) ); } else { this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts index 87ef0249..25a36553 100644 --- a/packages/grpc-js/src/load-balancing-call.ts +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -145,7 +145,9 @@ export class LoadBalancingCall implements Call { * metadata generation finished, we shouldn't do anything with * it. */ if (this.ended) { - this.trace('Credentials metadata generation finished after call ended'); + this.trace( + 'Credentials metadata generation finished after call ended' + ); return; } finalMetadata.merge(credsMetadata); diff --git a/packages/grpc-js/src/logging.ts b/packages/grpc-js/src/logging.ts index e1b396ff..2279d3b6 100644 --- a/packages/grpc-js/src/logging.ts +++ b/packages/grpc-js/src/logging.ts @@ -112,7 +112,18 @@ export function trace( text: string ): void { if (isTracerEnabled(tracer)) { - log(severity, new Date().toISOString() + ' | v' + clientVersion + ' ' + pid + ' | ' + tracer + ' | ' + text); + log( + severity, + new Date().toISOString() + + ' | v' + + clientVersion + + ' ' + + pid + + ' | ' + + tracer + + ' | ' + + text + ); } } diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 978f1442..6652839b 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -335,9 +335,14 @@ class DnsResolver implements Resolver { if (this.pendingLookupPromise === null) { if (this.isNextResolutionTimerRunning || this.backoff.isRunning()) { if (this.isNextResolutionTimerRunning) { - trace('resolution update delayed by "min time between resolutions" rate limit'); + trace( + 'resolution update delayed by "min time between resolutions" rate limit' + ); } else { - trace('resolution update delayed by backoff timer until ' + this.backoff.getEndTime().toISOString()); + trace( + 'resolution update delayed by backoff timer until ' + + this.backoff.getEndTime().toISOString() + ); } this.continueResolving = true; } else { diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index a8de2019..82c4ff43 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -223,8 +223,11 @@ export class ResolvingLoadBalancer implements LoadBalancer { * In that case, the backoff timer callback will call * updateResolution */ if (this.backoffTimeout.isRunning()) { - trace('requestReresolution delayed by backoff timer until ' + this.backoffTimeout.getEndTime().toISOString()); - this.continueResolving = true; + trace( + 'requestReresolution delayed by backoff timer until ' + + this.backoffTimeout.getEndTime().toISOString() + ); + this.continueResolving = true; } else { this.updateResolution(); } diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index edc38e98..95393fba 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -18,9 +18,7 @@ import { EventEmitter } from 'events'; import { Duplex, Readable, Writable } from 'stream'; -import { - Status, -} from './constants'; +import { Status } from './constants'; import { Deserialize, Serialize } from './make-client'; import { Metadata } from './metadata'; import { ObjectReadable, ObjectWritable } from './object-stream'; @@ -56,11 +54,14 @@ export type ServerDuplexStream = ServerSurfaceCall & ObjectReadable & ObjectWritable & { end: (metadata?: Metadata) => void }; -export function serverErrorToStatus(error: ServerErrorResponse | ServerStatusResponse, overrideTrailers?: Metadata | undefined): PartialStatusObject { +export function serverErrorToStatus( + error: ServerErrorResponse | ServerStatusResponse, + overrideTrailers?: Metadata | undefined +): PartialStatusObject { const status: PartialStatusObject = { code: Status.UNKNOWN, details: 'message' in error ? error.message : 'Unknown Error', - metadata: overrideTrailers ?? error.metadata ?? null + metadata: overrideTrailers ?? error.metadata ?? null, }; if ( @@ -154,7 +155,7 @@ export class ServerWritableStreamImpl private trailingMetadata: Metadata; private pendingStatus: PartialStatusObject = { code: Status.OK, - details: 'OK' + details: 'OK', }; constructor( @@ -224,7 +225,7 @@ export class ServerDuplexStreamImpl private trailingMetadata: Metadata; private pendingStatus: PartialStatusObject = { code: Status.OK, - details: 'OK' + details: 'OK', }; constructor( diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index c03f3028..60c5c886 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -15,19 +15,24 @@ * */ -import { PartialStatusObject} from "./call-interface"; -import { ServerMethodDefinition } from "./make-client"; -import { Metadata } from "./metadata"; -import { ChannelOptions } from "./channel-options"; -import { Handler, ServerErrorResponse } from "./server-call"; -import { Deadline } from "./deadline"; -import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, DEFAULT_MAX_SEND_MESSAGE_LENGTH, LogVerbosity, Status } from "./constants"; +import { PartialStatusObject } from './call-interface'; +import { ServerMethodDefinition } from './make-client'; +import { Metadata } from './metadata'; +import { ChannelOptions } from './channel-options'; +import { Handler, ServerErrorResponse } from './server-call'; +import { Deadline } from './deadline'; +import { + DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, + DEFAULT_MAX_SEND_MESSAGE_LENGTH, + LogVerbosity, + Status, +} from './constants'; import * as http2 from 'http2'; -import { getErrorMessage } from "./error"; +import { getErrorMessage } from './error'; import * as zlib from 'zlib'; -import { promisify } from "util"; -import { StreamDecoder } from "./stream-decoder"; -import { CallEventTracker } from "./transport"; +import { promisify } from 'util'; +import { StreamDecoder } from './stream-decoder'; +import { CallEventTracker } from './transport'; import * as logging from './logging'; const unzip = promisify(zlib.unzip); @@ -96,7 +101,7 @@ export class ServerListenerBuilder { onReceiveMetadata: this.metadata, onReceiveMessage: this.message, onReceiveHalfClose: this.halfClose, - onCancel: this.cancel + onCancel: this.cancel, }; } } @@ -109,22 +114,30 @@ export interface InterceptingServerListener { onCancel(): void; } -export function isInterceptingServerListener(listener: ServerListener | InterceptingServerListener): listener is InterceptingServerListener { - return listener.onReceiveMetadata !== undefined && listener.onReceiveMetadata.length === 1; +export function isInterceptingServerListener( + listener: ServerListener | InterceptingServerListener +): listener is InterceptingServerListener { + return ( + listener.onReceiveMetadata !== undefined && + listener.onReceiveMetadata.length === 1 + ); } class InterceptingServerListenerImpl implements InterceptingServerListener { /** * Once the call is cancelled, ignore all other events. */ - private cancelled: boolean = false; - private processingMetadata: boolean = false; - private hasPendingMessage: boolean = false; + private cancelled = false; + private processingMetadata = false; + private hasPendingMessage = false; private pendingMessage: any = null; - private processingMessage: boolean = false; - private hasPendingHalfClose: boolean = false; + private processingMessage = false; + private hasPendingHalfClose = false; - constructor(private listener: FullServerListener, private nextListener: InterceptingServerListener) {} + constructor( + private listener: FullServerListener, + private nextListener: InterceptingServerListener + ) {} private processPendingMessage() { if (this.hasPendingMessage) { @@ -195,7 +208,6 @@ class InterceptingServerListenerImpl implements InterceptingServerListener { this.listener.onCancel(); this.nextListener.onCancel(); } - } export interface StartResponder { @@ -212,7 +224,10 @@ export interface MessageResponder { } export interface StatusResponder { - (status: PartialStatusObject, next: (status: PartialStatusObject) => void): void; + ( + status: PartialStatusObject, + next: (status: PartialStatusObject) => void + ): void; } export interface FullResponder { @@ -255,7 +270,7 @@ export class ResponderBuilder { start: this.start, sendMetadata: this.metadata, sendMessage: this.message, - sendStatus: this.status + sendStatus: this.status, }; } } @@ -270,11 +285,11 @@ const defaultServerListener: FullServerListener = { onReceiveHalfClose: next => { next(); }, - onCancel: () => {} + onCancel: () => {}, }; const defaultResponder: FullResponder = { - start: (next) => { + start: next => { next(); }, sendMetadata: (metadata, next) => { @@ -285,7 +300,7 @@ const defaultResponder: FullResponder = { }, sendStatus: (status, next) => { next(status); - } + }, }; export interface ServerInterceptingCallInterface { @@ -321,18 +336,24 @@ export interface ServerInterceptingCallInterface { export class ServerInterceptingCall implements ServerInterceptingCallInterface { private responder: FullResponder; - private processingMetadata: boolean = false; - private processingMessage: boolean = false; + private processingMetadata = false; + private processingMessage = false; private pendingMessage: any = null; private pendingMessageCallback: (() => void) | null = null; private pendingStatus: PartialStatusObject | null = null; - constructor(private nextCall: ServerInterceptingCallInterface, responder?: Responder) { - this.responder = {...defaultResponder, ...responder}; + constructor( + private nextCall: ServerInterceptingCallInterface, + responder?: Responder + ) { + this.responder = { ...defaultResponder, ...responder }; } private processPendingMessage() { if (this.pendingMessageCallback) { - this.nextCall.sendMessage(this.pendingMessage, this.pendingMessageCallback); + this.nextCall.sendMessage( + this.pendingMessage, + this.pendingMessageCallback + ); this.pendingMessage = null; this.pendingMessageCallback = null; } @@ -347,8 +368,14 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { start(listener: InterceptingServerListener): void { this.responder.start(interceptedListener => { - const fullInterceptedListener: FullServerListener = {...defaultServerListener, ...interceptedListener}; - const finalInterceptingListener = new InterceptingServerListenerImpl(fullInterceptedListener, listener); + const fullInterceptedListener: FullServerListener = { + ...defaultServerListener, + ...interceptedListener, + }; + const finalInterceptingListener = new InterceptingServerListenerImpl( + fullInterceptedListener, + listener + ); this.nextCall.start(finalInterceptingListener); }); } @@ -394,7 +421,10 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { } export interface ServerInterceptor { - (methodDescriptor: ServerMethodDefinition, call: ServerInterceptingCallInterface): ServerInterceptingCall; + ( + methodDescriptor: ServerMethodDefinition, + call: ServerInterceptingCallInterface + ): ServerInterceptingCall; } interface DeadlineUnitIndexSignature { @@ -438,7 +468,9 @@ interface ReadQueueEntry { parsedMessage: any; } -export class BaseServerInterceptingCall implements ServerInterceptingCallInterface { +export class BaseServerInterceptingCall + implements ServerInterceptingCallInterface +{ private listener: InterceptingServerListener | null = null; private metadata: Metadata; private deadlineTimer: NodeJS.Timeout | null = null; @@ -449,7 +481,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa private metadataSent = false; private wantTrailers = false; private cancelNotified = false; - private incomingEncoding: string = 'identity'; + private incomingEncoding = 'identity'; private decoder = new StreamDecoder(); private readQueue: ReadQueueEntry[] = []; private isReadPending = false; @@ -485,7 +517,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa this.callEventTracker.onCallEnd({ code: Status.CANCELLED, details: 'Stream closed before sending status', - metadata: null + metadata: null, }); } @@ -548,7 +580,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa const status: PartialStatusObject = { code: Status.INTERNAL, details: `Invalid ${GRPC_TIMEOUT_HEADER} value "${timeoutHeader}"`, - metadata: null + metadata: null, }; // Wait for the constructor to complete before sending the error. process.nextTick(() => { @@ -565,11 +597,10 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa const status: PartialStatusObject = { code: Status.DEADLINE_EXCEEDED, details: 'Deadline exceeded', - metadata: null + metadata: null, }; this.sendStatus(status); }, timeout); - } private checkCancelled(): boolean { @@ -650,14 +681,19 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa } const compressed = queueEntry.compressedMessage!.readUInt8(0) === 1; - const compressedMessageEncoding = compressed ? this.incomingEncoding : 'identity'; - const decompressedMessage = await this.decompressMessage(queueEntry.compressedMessage!, compressedMessageEncoding); + const compressedMessageEncoding = compressed + ? this.incomingEncoding + : 'identity'; + const decompressedMessage = await this.decompressMessage( + queueEntry.compressedMessage!, + compressedMessageEncoding + ); try { queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage); } catch (err) { this.sendStatus({ code: Status.INTERNAL, - details: `Error deserializing request: ${(err as Error).message}` + details: `Error deserializing request: ${(err as Error).message}`, }); return; } @@ -666,7 +702,12 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa } private maybePushNextMessage() { - if (this.listener && this.isReadPending && this.readQueue.length > 0 && this.readQueue[0].type !== 'COMPRESSED') { + if ( + this.listener && + this.isReadPending && + this.readQueue.length > 0 && + this.readQueue[0].type !== 'COMPRESSED' + ) { this.isReadPending = false; const nextQueueEntry = this.readQueue.shift()!; if (nextQueueEntry.type === 'READABLE') { @@ -682,23 +723,33 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa if (this.checkCancelled()) { return; } - trace('Request to ' + this.handler.path + ' received data frame of size ' + data.length); + trace( + 'Request to ' + + this.handler.path + + ' received data frame of size ' + + data.length + ); const rawMessages = this.decoder.write(data); for (const messageBytes of rawMessages) { this.stream.pause(); - if (this.maxReceiveMessageSize !== -1 && messageBytes.length - 5 > this.maxReceiveMessageSize) { + if ( + this.maxReceiveMessageSize !== -1 && + messageBytes.length - 5 > this.maxReceiveMessageSize + ) { this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${messageBytes.length - 5} vs. ${this.maxReceiveMessageSize})`, - metadata: null + details: `Received message larger than max (${ + messageBytes.length - 5 + } vs. ${this.maxReceiveMessageSize})`, + metadata: null, }); return; } const queueEntry: ReadQueueEntry = { type: 'COMPRESSED', compressedMessage: messageBytes, - parsedMessage: null + parsedMessage: null, }; this.readQueue.push(queueEntry); this.decompressAndMaybePush(queueEntry); @@ -709,7 +760,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa this.readQueue.push({ type: 'HALF_CLOSE', compressedMessage: null, - parsedMessage: null + parsedMessage: null, }); this.receivedHalfClose = true; this.maybePushNextMessage(); @@ -751,7 +802,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa this.sendStatus({ code: Status.INTERNAL, details: `Error serializing response: ${getErrorMessage(e)}`, - metadata: null + metadata: null, }); return; } @@ -763,18 +814,23 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`, - metadata: null + metadata: null, }); return; } this.maybeSendMetadata(); - trace('Request to ' + this.handler.path + ' sent data frame of size ' + response.length); + trace( + 'Request to ' + + this.handler.path + + ' sent data frame of size ' + + response.length + ); this.stream.write(response, error => { if (error) { this.sendStatus({ code: Status.INTERNAL, details: `Error writing message: ${getErrorMessage(error)}`, - metadata: null + metadata: null, }); return; } @@ -871,16 +927,24 @@ export function getServerInterceptingCall( handler: Handler, options: ChannelOptions ) { - const methodDefinition: ServerMethodDefinition = { path: handler.path, requestStream: handler.type === 'clientStream' || handler.type === 'bidi', responseStream: handler.type === 'serverStream' || handler.type === 'bidi', requestDeserialize: handler.deserialize, - responseSerialize: handler.serialize - } - const baseCall = new BaseServerInterceptingCall(stream, headers, callEventTracker, handler, options); - return interceptors.reduce((call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => { - return interceptor(methodDefinition, call); - }, baseCall); + responseSerialize: handler.serialize, + }; + const baseCall = new BaseServerInterceptingCall( + stream, + headers, + callEventTracker, + handler, + options + ); + return interceptors.reduce( + (call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => { + return interceptor(methodDefinition, call); + }, + baseCall + ); } diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 31851b83..46bd22ea 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -55,7 +55,13 @@ import { subchannelAddressToString, stringToSubchannelAddress, } from './subchannel-address'; -import { GrpcUri, combineHostPort, parseUri, splitHostPort, uriToString } from './uri-parser'; +import { + GrpcUri, + combineHostPort, + parseUri, + splitHostPort, + uriToString, +} from './uri-parser'; import { ChannelzCallTracker, ChannelzChildrenTracker, @@ -70,7 +76,11 @@ import { unregisterChannelzRef, } from './channelz'; import { CipherNameAndProtocol, TLSSocket } from 'tls'; -import { ServerInterceptingCallInterface, ServerInterceptor, getServerInterceptingCall } from './server-interceptors'; +import { + ServerInterceptingCallInterface, + ServerInterceptor, + getServerInterceptingCall, +} from './server-interceptors'; import { PartialStatusObject } from './call-interface'; import { CallEventTracker } from './transport'; @@ -103,9 +113,15 @@ function noop(): void {} * @returns */ function deprecate(message: string) { - return function (target: (this: This, ...args: Args) => Return, context: ClassMethodDecoratorContext Return>) { + return function ( + target: (this: This, ...args: Args) => Return, + context: ClassMethodDecoratorContext< + This, + (this: This, ...args: Args) => Return + > + ) { return util.deprecate(target, message); - } + }; } function getUnimplementedStatusResponse( @@ -209,7 +225,7 @@ interface BoundPort { * that expands to multiple addresses will result in multiple listening * servers. */ - listeningServers: Set + listeningServers: Set; } /** @@ -221,11 +237,11 @@ interface Http2ServerInfo { } export interface ServerOptions extends ChannelOptions { - interceptors?: ServerInterceptor[] + interceptors?: ServerInterceptor[]; } export class Server { - private boundPorts: Map= new Map(); + private boundPorts: Map = new Map(); private http2Servers: Map = new Map(); private handlers: Map = new Map< @@ -526,10 +542,11 @@ export class Server { return http2Server; } - private bindOneAddress(address: SubchannelAddress, boundPortObject: BoundPort): Promise { - this.trace( - 'Attempting to bind ' + subchannelAddressToString(address) - ); + private bindOneAddress( + address: SubchannelAddress, + boundPortObject: BoundPort + ): Promise { + this.trace('Attempting to bind ' + subchannelAddressToString(address)); const http2Server = this.createHttp2Server(boundPortObject.credentials); return new Promise((resolve, reject) => { const onError = (err: Error) => { @@ -541,7 +558,7 @@ export class Server { ); resolve({ port: 'port' in address ? address.port : 1, - error: err.message + error: err.message, }); }; @@ -561,13 +578,15 @@ export class Server { }; } - const channelzRef = this.registerListenerToChannelz(boundSubchannelAddress); + const channelzRef = this.registerListenerToChannelz( + boundSubchannelAddress + ); if (this.channelzEnabled) { this.listenerChildrenTracker.refChild(channelzRef); } this.http2Servers.set(http2Server, { channelzRef: channelzRef, - sessions: new Set() + sessions: new Set(), }); boundPortObject.listeningServers.add(http2Server); this.trace( @@ -575,62 +594,86 @@ export class Server { subchannelAddressToString(boundSubchannelAddress) ); resolve({ - port: 'port' in boundSubchannelAddress - ? boundSubchannelAddress.port - : 1 + port: + 'port' in boundSubchannelAddress ? boundSubchannelAddress.port : 1, }); http2Server.removeListener('error', onError); }); }); } - private async bindManyPorts(addressList: SubchannelAddress[], boundPortObject: BoundPort): Promise { + private async bindManyPorts( + addressList: SubchannelAddress[], + boundPortObject: BoundPort + ): Promise { if (addressList.length === 0) { return { count: 0, port: 0, - errors: [] + errors: [], }; } if (isTcpSubchannelAddress(addressList[0]) && addressList[0].port === 0) { /* If binding to port 0, first try to bind the first address, then bind * the rest of the address list to the specific port that it binds. */ - const firstAddressResult = await this.bindOneAddress(addressList[0], boundPortObject); + const firstAddressResult = await this.bindOneAddress( + addressList[0], + boundPortObject + ); if (firstAddressResult.error) { /* If the first address fails to bind, try the same operation starting * from the second item in the list. */ - const restAddressResult = await this.bindManyPorts(addressList.slice(1), boundPortObject); + const restAddressResult = await this.bindManyPorts( + addressList.slice(1), + boundPortObject + ); return { ...restAddressResult, - errors: [firstAddressResult.error, ...restAddressResult.errors] + errors: [firstAddressResult.error, ...restAddressResult.errors], }; } else { - const restAddresses = addressList.slice(1).map(address => isTcpSubchannelAddress(address) ? {host: address.host, port: firstAddressResult.port} : address) - const restAddressResult = await Promise.all(restAddresses.map(address => this.bindOneAddress(address, boundPortObject))); + const restAddresses = addressList + .slice(1) + .map(address => + isTcpSubchannelAddress(address) + ? { host: address.host, port: firstAddressResult.port } + : address + ); + const restAddressResult = await Promise.all( + restAddresses.map(address => + this.bindOneAddress(address, boundPortObject) + ) + ); const allResults = [firstAddressResult, ...restAddressResult]; return { count: allResults.filter(result => result.error === undefined).length, port: firstAddressResult.port, - errors: allResults.filter(result => result.error).map(result => result.error!) + errors: allResults + .filter(result => result.error) + .map(result => result.error!), }; } } else { - const allResults = await Promise.all(addressList.map(address => this.bindOneAddress(address, boundPortObject))); + const allResults = await Promise.all( + addressList.map(address => + this.bindOneAddress(address, boundPortObject) + ) + ); return { count: allResults.filter(result => result.error === undefined).length, port: allResults[0].port, - errors: allResults.filter(result => result.error).map(result => result.error!) + errors: allResults + .filter(result => result.error) + .map(result => result.error!), }; } } - private async bindAddressList(addressList: SubchannelAddress[], boundPortObject: BoundPort): Promise { - let bindResult: BindResult; - try { - bindResult = await this.bindManyPorts(addressList, boundPortObject); - } catch (error) { - throw error; - } + private async bindAddressList( + addressList: SubchannelAddress[], + boundPortObject: BoundPort + ): Promise { + const bindResult = await this.bindManyPorts(addressList, boundPortObject); if (bindResult.count > 0) { if (bindResult.count < addressList.length) { logging.log( @@ -642,7 +685,9 @@ export class Server { } else { const errorString = `No address added out of total ${addressList.length} resolved`; logging.log(LogVerbosity.ERROR, errorString); - throw new Error(`${errorString} errors: [${bindResult.errors.join(',')}]`); + throw new Error( + `${errorString} errors: [${bindResult.errors.join(',')}]` + ); } } @@ -660,9 +705,7 @@ export class Server { ...endpointList.map(endpoint => endpoint.addresses) ); if (addressList.length === 0) { - reject( - new Error(`No addresses resolved for port ${port}`) - ); + reject(new Error(`No addresses resolved for port ${port}`)); return; } resolve(addressList); @@ -676,7 +719,10 @@ export class Server { }); } - private async bindPort(port: GrpcUri, boundPortObject: BoundPort): Promise { + private async bindPort( + port: GrpcUri, + boundPortObject: BoundPort + ): Promise { const addressList = await this.resolvePort(port); if (boundPortObject.cancelled) { this.completeUnbind(boundPortObject); @@ -691,7 +737,6 @@ export class Server { } private normalizePort(port: string): GrpcUri { - const initialPortUri = parseUri(port); if (initialPortUri === null) { throw new Error(`Could not parse port "${port}"`); @@ -736,14 +781,20 @@ export class Server { let boundPortObject = this.boundPorts.get(uriToString(portUri)); if (boundPortObject) { if (!creds._equals(boundPortObject.credentials)) { - deferredCallback(new Error(`${port} already bound with incompatible credentials`), 0); + deferredCallback( + new Error(`${port} already bound with incompatible credentials`), + 0 + ); return; } /* If that operation has previously been cancelled by an unbind call, * uncancel it. */ boundPortObject.cancelled = false; if (boundPortObject.completionPromise) { - boundPortObject.completionPromise.then(portNum => callback(null, portNum), error => callback(error as Error, 0)); + boundPortObject.completionPromise.then( + portNum => callback(null, portNum), + error => callback(error as Error, 0) + ); } else { deferredCallback(null, boundPortObject.portNumber); } @@ -756,7 +807,7 @@ export class Server { cancelled: false, portNumber: 0, credentials: creds, - listeningServers: new Set() + listeningServers: new Set(), }; const splitPort = splitHostPort(portUri.path); const completionPromise = this.bindPort(portUri, boundPortObject); @@ -765,34 +816,42 @@ export class Server { * bind operation completes and we have a specific port number. Otherwise, * populate it immediately. */ if (splitPort?.port === 0) { - completionPromise.then(portNum => { - const finalUri: GrpcUri = { - scheme: portUri.scheme, - authority: portUri.authority, - path: combineHostPort({host: splitPort.host, port: portNum}) - }; - boundPortObject!.mapKey = uriToString(finalUri); - boundPortObject!.completionPromise = null; - boundPortObject!.portNumber = portNum; - this.boundPorts.set(boundPortObject!.mapKey, boundPortObject!); - callback(null, portNum); - }, error => { - callback(error, 0); - }) + completionPromise.then( + portNum => { + const finalUri: GrpcUri = { + scheme: portUri.scheme, + authority: portUri.authority, + path: combineHostPort({ host: splitPort.host, port: portNum }), + }; + boundPortObject!.mapKey = uriToString(finalUri); + boundPortObject!.completionPromise = null; + boundPortObject!.portNumber = portNum; + this.boundPorts.set(boundPortObject!.mapKey, boundPortObject!); + callback(null, portNum); + }, + error => { + callback(error, 0); + } + ); } else { this.boundPorts.set(boundPortObject.mapKey, boundPortObject); - completionPromise.then(portNum => { - boundPortObject!.completionPromise = null; - boundPortObject!.portNumber = portNum; - callback(null, portNum); - }, error => { - callback(error, 0); - }); + completionPromise.then( + portNum => { + boundPortObject!.completionPromise = null; + boundPortObject!.portNumber = portNum; + callback(null, portNum); + }, + error => { + callback(error, 0); + } + ); } } private closeServer(server: AnyHttp2Server, callback?: () => void) { - this.trace('Closing server with address ' + JSON.stringify(server.address())); + this.trace( + 'Closing server with address ' + JSON.stringify(server.address()) + ); const serverInfo = this.http2Servers.get(server); server.close(() => { if (this.channelzEnabled && serverInfo) { @@ -802,10 +861,12 @@ export class Server { this.http2Servers.delete(server); callback?.(); }); - } - private closeSession(session: http2.ServerHttp2Session, callback?: () => void) { + private closeSession( + session: http2.ServerHttp2Session, + callback?: () => void + ) { this.trace('Closing session initiated by ' + session.socket?.remoteAddress); const sessionInfo = this.sessions.get(session); const closeCallback = () => { @@ -854,7 +915,12 @@ export class Server { } const boundPortObject = this.boundPorts.get(uriToString(portUri)); if (boundPortObject) { - this.trace('unbinding ' + boundPortObject.mapKey + ' originally bound as ' + uriToString(boundPortObject.originalUri)); + this.trace( + 'unbinding ' + + boundPortObject.mapKey + + ' originally bound as ' + + uriToString(boundPortObject.originalUri) + ); /* If the bind operation is pending, the cancelled flag will trigger * the unbind operation later. */ if (boundPortObject.completionPromise) { @@ -964,13 +1030,13 @@ export class Server { /** * @deprecated No longer needed as of version 1.10.x */ - @deprecate('Calling start() is no longer necessary. It can be safely omitted.') + @deprecate( + 'Calling start() is no longer necessary. It can be safely omitted.' + ) start(): void { if ( this.http2Servers.size === 0 || - [...this.http2Servers.keys()].every( - server => !server.listening - ) + [...this.http2Servers.keys()].every(server => !server.listening) ) { throw new Error('server must be bound in order to start'); } @@ -1090,9 +1156,9 @@ export class Server { 'grpc-message': err.details, [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK, [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto', - ...err.metadata?.toHttp2Headers() + ...err.metadata?.toHttp2Headers(), }; - stream.respond(trailersToSend, {endStream: true}); + stream.respond(trailersToSend, { endStream: true }); if (this.channelzEnabled) { this.callTracker.addCallFailed(); @@ -1129,7 +1195,7 @@ export class Server { return; } - let callEventTracker: CallEventTracker = { + const callEventTracker: CallEventTracker = { addMessageSent: () => { if (channelzSessionInfo) { channelzSessionInfo.messagesSent += 1; @@ -1157,10 +1223,17 @@ export class Server { channelzSessionInfo.streamTracker.addCallFailed(); } } - } - } + }, + }; - const call = getServerInterceptingCall(this.interceptors, stream, headers, callEventTracker, handler, this.options); + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + callEventTracker, + handler, + this.options + ); if (!this._runHandlerForCall(call, handler)) { this.callTracker.addCallFailed(); @@ -1193,7 +1266,14 @@ export class Server { return; } - const call = getServerInterceptingCall(this.interceptors, stream, headers, null, handler, this.options); + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + null, + handler, + this.options + ); if (!this._runHandlerForCall(call, handler)) { call.sendStatus({ @@ -1207,25 +1287,15 @@ export class Server { call: ServerInterceptingCallInterface, handler: Handler ): boolean { - const { type } = handler; if (type === 'unary') { handleUnary(call, handler as UntypedUnaryHandler); } else if (type === 'clientStream') { - handleClientStreaming( - call, - handler as UntypedClientStreamingHandler - ); + handleClientStreaming(call, handler as UntypedClientStreamingHandler); } else if (type === 'serverStream') { - handleServerStreaming( - call, - handler as UntypedServerStreamingHandler - ); + handleServerStreaming(call, handler as UntypedServerStreamingHandler); } else if (type === 'bidi') { - handleBidiStreaming( - call, - handler as UntypedBidiStreamingHandler - ); + handleBidiStreaming(call, handler as UntypedBidiStreamingHandler); } else { return false; } @@ -1257,7 +1327,6 @@ export class Server { http2Server.on('stream', handler.bind(this)); http2Server.on('session', session => { - const channelzRef = registerChannelzSocket( session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session), @@ -1388,7 +1457,7 @@ async function handleUnary( call.sendStatus({ code: Status.OK, details: 'OK', - metadata: trailer ?? null + metadata: trailer ?? null, }); }); } @@ -1405,7 +1474,7 @@ async function handleUnary( call.sendStatus({ code: Status.UNIMPLEMENTED, details: `Received a second request message for server streaming method ${handler.path}`, - metadata: null + metadata: null, }); return; } @@ -1417,18 +1486,25 @@ async function handleUnary( call.sendStatus({ code: Status.UNIMPLEMENTED, details: `Received no request message for server streaming method ${handler.path}`, - metadata: null + metadata: null, }); return; } - stream = new ServerWritableStreamImpl(handler.path, call, requestMetadata, requestMessage); + stream = new ServerWritableStreamImpl( + handler.path, + call, + requestMetadata, + requestMessage + ); try { handler.func(stream, respond); } catch (err) { call.sendStatus({ code: Status.UNKNOWN, - details: `Server method handler threw error ${(err as Error).message}`, - metadata: null + details: `Server method handler threw error ${ + (err as Error).message + }`, + metadata: null, }); } }, @@ -1461,7 +1537,7 @@ function handleClientStreaming( call.sendStatus({ code: Status.OK, details: 'OK', - metadata: trailer ?? null + metadata: trailer ?? null, }); }); } @@ -1474,8 +1550,10 @@ function handleClientStreaming( } catch (err) { call.sendStatus({ code: Status.UNKNOWN, - details: `Server method handler threw error ${(err as Error).message}`, - metadata: null + details: `Server method handler threw error ${ + (err as Error).message + }`, + metadata: null, }); } }, @@ -1513,7 +1591,7 @@ function handleServerStreaming( call.sendStatus({ code: Status.UNIMPLEMENTED, details: `Received a second request message for server streaming method ${handler.path}`, - metadata: null + metadata: null, }); return; } @@ -1525,18 +1603,25 @@ function handleServerStreaming( call.sendStatus({ code: Status.UNIMPLEMENTED, details: `Received no request message for server streaming method ${handler.path}`, - metadata: null + metadata: null, }); return; } - stream = new ServerWritableStreamImpl(handler.path, call, requestMetadata, requestMessage); + stream = new ServerWritableStreamImpl( + handler.path, + call, + requestMetadata, + requestMessage + ); try { handler.func(stream); } catch (err) { call.sendStatus({ code: Status.UNKNOWN, - details: `Server method handler threw error ${(err as Error).message}`, - metadata: null + details: `Server method handler threw error ${ + (err as Error).message + }`, + metadata: null, }); } }, @@ -1564,8 +1649,10 @@ function handleBidiStreaming( } catch (err) { call.sendStatus({ code: Status.UNKNOWN, - details: `Server method handler threw error ${(err as Error).message}`, - metadata: null + details: `Server method handler threw error ${ + (err as Error).message + }`, + metadata: null, }); } }, diff --git a/packages/grpc-js/src/service-config.ts b/packages/grpc-js/src/service-config.ts index 5c2ca0d0..b0d0d557 100644 --- a/packages/grpc-js/src/service-config.ts +++ b/packages/grpc-js/src/service-config.ts @@ -356,17 +356,23 @@ export function validateRetryThrottling(obj: any): RetryThrottling { function validateLoadBalancingConfig(obj: any): LoadBalancingConfig { if (!(typeof obj === 'object' && obj !== null)) { - throw new Error(`Invalid loadBalancingConfig: unexpected type ${typeof obj}`); + throw new Error( + `Invalid loadBalancingConfig: unexpected type ${typeof obj}` + ); } const keys = Object.keys(obj); if (keys.length > 1) { - throw new Error(`Invalid loadBalancingConfig: unexpected multiple keys ${keys}`); + throw new Error( + `Invalid loadBalancingConfig: unexpected multiple keys ${keys}` + ); } if (keys.length === 0) { - throw new Error('Invalid loadBalancingConfig: load balancing policy name required'); + throw new Error( + 'Invalid loadBalancingConfig: load balancing policy name required' + ); } return { - [keys[0]]: obj[keys[0]] + [keys[0]]: obj[keys[0]], }; } @@ -385,7 +391,6 @@ export function validateServiceConfig(obj: any): ServiceConfig { if ('loadBalancingConfig' in obj) { if (Array.isArray(obj.loadBalancingConfig)) { for (const config of obj.loadBalancingConfig) { - result.loadBalancingConfig.push(validateLoadBalancingConfig(config)); } } else { diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index b2158137..c4941b06 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -205,7 +205,12 @@ class Http2Transport implements Transport { ) { tooManyPings = true; } - this.trace('connection closed by GOAWAY with code ' + errorCode + ' and data ' + opaqueData?.toString()); + this.trace( + 'connection closed by GOAWAY with code ' + + errorCode + + ' and data ' + + opaqueData?.toString() + ); this.reportDisconnectToOwner(tooManyPings); } ); diff --git a/packages/grpc-js/test/common.ts b/packages/grpc-js/test/common.ts index f6aeed10..88aa129a 100644 --- a/packages/grpc-js/test/common.ts +++ b/packages/grpc-js/test/common.ts @@ -129,7 +129,10 @@ export class TestClient { this.client.echo({}, callback); } - sendRequestWithMetadata(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError) => void) { + sendRequestWithMetadata( + metadata: grpc.Metadata, + callback: (error?: grpc.ServiceError) => void + ) { this.client.echo({}, metadata, callback); } diff --git a/packages/grpc-js/test/test-confg-parsing.ts b/packages/grpc-js/test/test-confg-parsing.ts index 569b83f5..b5b9832a 100644 --- a/packages/grpc-js/test/test-confg-parsing.ts +++ b/packages/grpc-js/test/test-confg-parsing.ts @@ -28,7 +28,7 @@ import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig; */ interface TestCase { name: string; - input: object, + input: object; output?: object; error?: RegExp; } @@ -40,52 +40,52 @@ interface TestCase { * Note: some tests have an expected output that is different from the output, * but all non-error tests additionally verify that parsing the output again * produces the same output. */ -const allTestCases: {[lbPolicyName: string]: TestCase[]} = { +const allTestCases: { [lbPolicyName: string]: TestCase[] } = { pick_first: [ { name: 'no fields set', input: {}, output: { - shuffleAddressList: false - } + shuffleAddressList: false, + }, }, { name: 'shuffleAddressList set', input: { - shuffleAddressList: true - } - } + shuffleAddressList: true, + }, + }, ], round_robin: [ { name: 'no fields set', - input: {} - } + input: {}, + }, ], outlier_detection: [ { name: 'only required fields set', input: { - child_policy: [{round_robin: {}}] + child_policy: [{ round_robin: {} }], }, output: { interval: { seconds: 10, - nanos: 0 + nanos: 0, }, base_ejection_time: { seconds: 30, - nanos: 0 + nanos: 0, }, max_ejection_time: { seconds: 300, - nanos: 0 + nanos: 0, }, max_ejection_percent: 10, success_rate_ejection: undefined, failure_percentage_ejection: undefined, - child_policy: [{round_robin: {}}] - } + child_policy: [{ round_robin: {} }], + }, }, { name: 'all optional fields undefined', @@ -96,53 +96,53 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = { max_ejection_percent: undefined, success_rate_ejection: undefined, failure_percentage_ejection: undefined, - child_policy: [{round_robin: {}}] + child_policy: [{ round_robin: {} }], }, output: { interval: { seconds: 10, - nanos: 0 + nanos: 0, }, base_ejection_time: { seconds: 30, - nanos: 0 + nanos: 0, }, max_ejection_time: { seconds: 300, - nanos: 0 + nanos: 0, }, max_ejection_percent: 10, success_rate_ejection: undefined, failure_percentage_ejection: undefined, - child_policy: [{round_robin: {}}] - } + child_policy: [{ round_robin: {} }], + }, }, { name: 'empty ejection configs', input: { success_rate_ejection: {}, failure_percentage_ejection: {}, - child_policy: [{round_robin: {}}] + child_policy: [{ round_robin: {} }], }, output: { interval: { seconds: 10, - nanos: 0 + nanos: 0, }, base_ejection_time: { seconds: 30, - nanos: 0 + nanos: 0, }, max_ejection_time: { seconds: 300, - nanos: 0 + nanos: 0, }, max_ejection_percent: 10, success_rate_ejection: { stdev_factor: 1900, enforcement_percentage: 100, minimum_hosts: 5, - request_volume: 100 + request_volume: 100, }, failure_percentage_ejection: { threshold: 85, @@ -150,30 +150,30 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = { minimum_hosts: 5, request_volume: 50, }, - child_policy: [{round_robin: {}}] - } + child_policy: [{ round_robin: {} }], + }, }, { name: 'all fields populated', input: { interval: { seconds: 20, - nanos: 0 + nanos: 0, }, base_ejection_time: { seconds: 40, - nanos: 0 + nanos: 0, }, max_ejection_time: { seconds: 400, - nanos: 0 + nanos: 0, }, max_ejection_percent: 20, success_rate_ejection: { stdev_factor: 1800, enforcement_percentage: 90, minimum_hosts: 4, - request_volume: 200 + request_volume: 200, }, failure_percentage_ejection: { threshold: 95, @@ -181,29 +181,34 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = { minimum_hosts: 4, request_volume: 60, }, - child_policy: [{round_robin: {}}] - - } - } - ] -} + child_policy: [{ round_robin: {} }], + }, + }, + ], +}; describe('Load balancing policy config parsing', () => { for (const [lbPolicyName, testCases] of Object.entries(allTestCases)) { describe(lbPolicyName, () => { for (const testCase of testCases) { it(testCase.name, () => { - const lbConfigInput = {[lbPolicyName]: testCase.input}; + const lbConfigInput = { [lbPolicyName]: testCase.input }; if (testCase.error) { assert.throws(() => { parseLoadBalancingConfig(lbConfigInput); }, testCase.error); } else { const expectedOutput = testCase.output ?? testCase.input; - const parsedJson = parseLoadBalancingConfig(lbConfigInput).toJsonObject(); - assert.deepStrictEqual(parsedJson, {[lbPolicyName]: expectedOutput}); + const parsedJson = + parseLoadBalancingConfig(lbConfigInput).toJsonObject(); + assert.deepStrictEqual(parsedJson, { + [lbPolicyName]: expectedOutput, + }); // Test idempotency - assert.deepStrictEqual(parseLoadBalancingConfig(parsedJson).toJsonObject(), parsedJson); + assert.deepStrictEqual( + parseLoadBalancingConfig(parsedJson).toJsonObject(), + parsedJson + ); } }); } diff --git a/packages/grpc-js/test/test-pick-first.ts b/packages/grpc-js/test/test-pick-first.ts index df7a3c74..4c2c319e 100644 --- a/packages/grpc-js/test/test-pick-first.ts +++ b/packages/grpc-js/test/test-pick-first.ts @@ -561,28 +561,43 @@ describe('pick_first load balancing policy', () => { }, updateState: updateStateCallBackForExpectedStateSequence( [ConnectivityState.CONNECTING, ConnectivityState.TRANSIENT_FAILURE], - err => setImmediate(() => { - assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount); - done(err); - }) + err => + setImmediate(() => { + assert.strictEqual( + reresolutionRequestCount, + targetReresolutionRequestCount + ); + done(err); + }) ), requestReresolution: () => { reresolutionRequestCount += 1; - } + }, } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); - pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 1 }] }], + config + ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); process.nextTick(() => { - pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 2 }] }], + config + ); process.nextTick(() => { subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE); process.nextTick(() => { - pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 3 }]}], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 3 }] }], + config + ); process.nextTick(() => { - subchannels[2].transitionToState(ConnectivityState.TRANSIENT_FAILURE); + subchannels[2].transitionToState( + ConnectivityState.TRANSIENT_FAILURE + ); }); }); }); @@ -606,22 +621,35 @@ describe('pick_first load balancing policy', () => { }, updateState: updateStateCallBackForExpectedStateSequence( [ConnectivityState.TRANSIENT_FAILURE], - err => setImmediate(() => { - assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount); - done(err); - }) + err => + setImmediate(() => { + assert.strictEqual( + reresolutionRequestCount, + targetReresolutionRequestCount + ); + done(err); + }) ), requestReresolution: () => { reresolutionRequestCount += 1; - } + }, } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); - pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 1 }] }], + config + ); process.nextTick(() => { - pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 2 }] }], + config + ); process.nextTick(() => { - pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 2 }] }], + config + ); }); }); }); @@ -639,13 +667,20 @@ describe('pick_first load balancing policy', () => { return subchannel; }, updateState: updateStateCallBackForExpectedStateSequence( - [ConnectivityState.READY, ConnectivityState.IDLE, ConnectivityState.READY], + [ + ConnectivityState.READY, + ConnectivityState.IDLE, + ConnectivityState.READY, + ], done ), } ); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); - pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config); + pickFirst.updateAddressList( + [{ addresses: [{ host: 'localhost', port: 1 }] }], + config + ); process.nextTick(() => { subchannels[0].transitionToState(ConnectivityState.IDLE); process.nextTick(() => { diff --git a/packages/grpc-js/test/test-server-interceptors.ts b/packages/grpc-js/test/test-server-interceptors.ts index b6b5b55f..5e93d32d 100644 --- a/packages/grpc-js/test/test-server-interceptors.ts +++ b/packages/grpc-js/test/test-server-interceptors.ts @@ -26,23 +26,28 @@ const echoService = loadProtoFile(protoFile) const AUTH_HEADER_KEY = 'auth'; const AUTH_HEADER_ALLOWED_VALUE = 'allowed'; -const testAuthInterceptor: grpc.ServerInterceptor = (methodDescriptor, call) => { +const testAuthInterceptor: grpc.ServerInterceptor = ( + methodDescriptor, + call +) => { return new grpc.ServerInterceptingCall(call, { start: next => { const authListener: grpc.ServerListener = { onReceiveMetadata: (metadata, mdNext) => { - if (metadata.get(AUTH_HEADER_KEY)?.[0] !== AUTH_HEADER_ALLOWED_VALUE) { + if ( + metadata.get(AUTH_HEADER_KEY)?.[0] !== AUTH_HEADER_ALLOWED_VALUE + ) { call.sendStatus({ code: grpc.status.UNAUTHENTICATED, - details: 'Auth metadata not correct' + details: 'Auth metadata not correct', }); } else { mdNext(metadata); } - } + }, }; next(authListener); - } + }, }); }; @@ -52,7 +57,7 @@ let eventCounts = { receiveHalfClose: 0, sendMetadata: 0, sendMessage: 0, - sendStatus: 0 + sendStatus: 0, }; function resetEventCounts() { @@ -62,7 +67,7 @@ function resetEventCounts() { receiveHalfClose: 0, sendMetadata: 0, sendMessage: 0, - sendStatus: 0 + sendStatus: 0, }; } @@ -72,7 +77,10 @@ function resetEventCounts() { * @param methodDescription * @param call */ -const testLoggingInterceptor: grpc.ServerInterceptor = (methodDescription, call) => { +const testLoggingInterceptor: grpc.ServerInterceptor = ( + methodDescription, + call +) => { return new grpc.ServerInterceptingCall(call, { start: next => { next({ @@ -87,7 +95,7 @@ const testLoggingInterceptor: grpc.ServerInterceptor = (methodDescription, call) onReceiveHalfClose: hcNext => { eventCounts.receiveHalfClose += 1; hcNext(); - } + }, }); }, sendMetadata: (metadata, mdNext) => { @@ -101,21 +109,24 @@ const testLoggingInterceptor: grpc.ServerInterceptor = (methodDescription, call) sendStatus: (status, statusNext) => { eventCounts.sendStatus += 1; statusNext(status); - } + }, }); }; -const testHeaderInjectionInterceptor: grpc.ServerInterceptor = (methodDescriptor, call) => { +const testHeaderInjectionInterceptor: grpc.ServerInterceptor = ( + methodDescriptor, + call +) => { return new grpc.ServerInterceptingCall(call, { start: next => { const authListener: grpc.ServerListener = { onReceiveMetadata: (metadata, mdNext) => { metadata.set('injected-header', 'present'); mdNext(metadata); - } + }, }; next(authListener); - } + }, }); }; @@ -126,22 +137,29 @@ describe('Server interceptors', () => { /* Tests that an interceptor can entirely prevent the handler from being * invoked, based on the contents of the metadata. */ before(done => { - server = new grpc.Server({interceptors: [testAuthInterceptor]}); + server = new grpc.Server({ interceptors: [testAuthInterceptor] }); server.addService(echoService.service, { echo: ( call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData ) => { // A test will fail if a request makes it to the handler without the correct auth header - assert.strictEqual(call.metadata.get(AUTH_HEADER_KEY)?.[0], AUTH_HEADER_ALLOWED_VALUE); + assert.strictEqual( + call.metadata.get(AUTH_HEADER_KEY)?.[0], + AUTH_HEADER_ALLOWED_VALUE + ); callback(null, call.request); }, }); - server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { - assert.ifError(error); - client = new TestClient(port, false); - done(); - }); + server.bindAsync( + 'localhost:0', + grpc.ServerCredentials.createInsecure(), + (error, port) => { + assert.ifError(error); + client = new TestClient(port, false); + done(); + } + ); }); after(done => { client.close(); @@ -165,7 +183,7 @@ describe('Server interceptors', () => { let server: grpc.Server; let client: TestClient; before(done => { - server = new grpc.Server({interceptors: [testLoggingInterceptor]}); + server = new grpc.Server({ interceptors: [testLoggingInterceptor] }); server.addService(echoService.service, { echo: ( call: grpc.ServerUnaryCall, @@ -175,11 +193,15 @@ describe('Server interceptors', () => { callback(null, call.request); }, }); - server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { - assert.ifError(error); - client = new TestClient(port, false); - done(); - }); + server.bindAsync( + 'localhost:0', + grpc.ServerCredentials.createInsecure(), + (error, port) => { + assert.ifError(error); + client = new TestClient(port, false); + done(); + } + ); }); after(done => { client.close(); @@ -197,7 +219,7 @@ describe('Server interceptors', () => { receiveHalfClose: 1, sendMetadata: 1, sendMessage: 1, - sendStatus: 1 + sendStatus: 1, }); done(); }); @@ -207,21 +229,30 @@ describe('Server interceptors', () => { let server: grpc.Server; let client: TestClient; before(done => { - server = new grpc.Server({interceptors: [testHeaderInjectionInterceptor]}); + server = new grpc.Server({ + interceptors: [testHeaderInjectionInterceptor], + }); server.addService(echoService.service, { echo: ( call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData ) => { - assert.strictEqual(call.metadata.get('injected-header')?.[0], 'present'); + assert.strictEqual( + call.metadata.get('injected-header')?.[0], + 'present' + ); callback(null, call.request); }, }); - server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { - assert.ifError(error); - client = new TestClient(port, false); - done(); - }); + server.bindAsync( + 'localhost:0', + grpc.ServerCredentials.createInsecure(), + (error, port) => { + assert.ifError(error); + client = new TestClient(port, false); + done(); + } + ); }); after(done => { client.close(); @@ -235,23 +266,39 @@ describe('Server interceptors', () => { let server: grpc.Server; let client: TestClient; before(done => { - server = new grpc.Server({interceptors: [testAuthInterceptor, testLoggingInterceptor, testHeaderInjectionInterceptor]}); + server = new grpc.Server({ + interceptors: [ + testAuthInterceptor, + testLoggingInterceptor, + testHeaderInjectionInterceptor, + ], + }); server.addService(echoService.service, { echo: ( call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData ) => { - assert.strictEqual(call.metadata.get(AUTH_HEADER_KEY)?.[0], AUTH_HEADER_ALLOWED_VALUE); - assert.strictEqual(call.metadata.get('injected-header')?.[0], 'present'); + assert.strictEqual( + call.metadata.get(AUTH_HEADER_KEY)?.[0], + AUTH_HEADER_ALLOWED_VALUE + ); + assert.strictEqual( + call.metadata.get('injected-header')?.[0], + 'present' + ); call.sendMetadata(new grpc.Metadata()); callback(null, call.request); }, }); - server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { - assert.ifError(error); - client = new TestClient(port, false); - done(); - }); + server.bindAsync( + 'localhost:0', + grpc.ServerCredentials.createInsecure(), + (error, port) => { + assert.ifError(error); + client = new TestClient(port, false); + done(); + } + ); }); after(done => { client.close(); @@ -271,7 +318,7 @@ describe('Server interceptors', () => { receiveHalfClose: 0, sendMetadata: 0, sendMessage: 0, - sendStatus: 0 + sendStatus: 0, }); done(); }); @@ -287,7 +334,7 @@ describe('Server interceptors', () => { receiveHalfClose: 1, sendMetadata: 1, sendMessage: 1, - sendStatus: 1 + sendStatus: 1, }); done(); }); diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index 48b305ef..dbcdad46 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -149,14 +149,22 @@ describe('Server', () => { }); it('succeeds when called with an already bound port', done => { - server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => { - assert.ifError(err); - server.bindAsync(`localhost:${port}`, ServerCredentials.createInsecure(), (err2, port2) => { - assert.ifError(err2); - assert.strictEqual(port, port2); - done(); - }); - }); + server.bindAsync( + 'localhost:0', + ServerCredentials.createInsecure(), + (err, port) => { + assert.ifError(err); + server.bindAsync( + `localhost:${port}`, + ServerCredentials.createInsecure(), + (err2, port2) => { + assert.ifError(err2); + assert.strictEqual(port, port2); + done(); + } + ); + } + ); }); it('fails when called on a bound port with different credentials', done => { @@ -165,15 +173,19 @@ describe('Server', () => { [{ private_key: key, cert_chain: cert }], true ); - server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => { - assert.ifError(err); - server.bindAsync(`localhost:${port}`, secureCreds, (err2, port2) => { - assert(err2 !== null); - assert.match(err2.message, /credentials/); - done(); - }) - }); - }) + server.bindAsync( + 'localhost:0', + ServerCredentials.createInsecure(), + (err, port) => { + assert.ifError(err); + server.bindAsync(`localhost:${port}`, secureCreds, (err2, port2) => { + assert(err2 !== null); + assert.match(err2.message, /credentials/); + done(); + }); + } + ); + }); }); describe('unbind', () => { @@ -188,42 +200,73 @@ describe('Server', () => { assert.throws(() => { server.unbind('localhost:0'); }, /port 0/); - server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => { - assert.ifError(err); - assert.notStrictEqual(port, 0); - assert.throws(() => { - server.unbind('localhost:0'); - }, /port 0/); - done(); - }) + server.bindAsync( + 'localhost:0', + ServerCredentials.createInsecure(), + (err, port) => { + assert.ifError(err); + assert.notStrictEqual(port, 0); + assert.throws(() => { + server.unbind('localhost:0'); + }, /port 0/); + done(); + } + ); }); it('successfully unbinds a bound ephemeral port', done => { - server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => { - client = new grpc.Client(`localhost:${port}`, grpc.credentials.createInsecure()); - client.makeUnaryRequest('/math.Math/Div', x => x, x => x, Buffer.from('abc'), (callError1, result) => { - assert(callError1); - // UNIMPLEMENTED means that the request reached the call handling code - assert.strictEqual(callError1.code, grpc.status.UNIMPLEMENTED); - server.unbind(`localhost:${port}`); - const deadline = new Date(); - deadline.setSeconds(deadline.getSeconds() + 1); - client!.makeUnaryRequest('/math.Math/Div', x => x, x => x, Buffer.from('abc'), {deadline: deadline}, (callError2, result) => { - assert(callError2); - // DEADLINE_EXCEEDED means that the server is unreachable - assert(callError2.code === grpc.status.DEADLINE_EXCEEDED || callError2.code === grpc.status.UNAVAILABLE); - done(); - }); - }); - }) + server.bindAsync( + 'localhost:0', + ServerCredentials.createInsecure(), + (err, port) => { + client = new grpc.Client( + `localhost:${port}`, + grpc.credentials.createInsecure() + ); + client.makeUnaryRequest( + '/math.Math/Div', + x => x, + x => x, + Buffer.from('abc'), + (callError1, result) => { + assert(callError1); + // UNIMPLEMENTED means that the request reached the call handling code + assert.strictEqual(callError1.code, grpc.status.UNIMPLEMENTED); + server.unbind(`localhost:${port}`); + const deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + client!.makeUnaryRequest( + '/math.Math/Div', + x => x, + x => x, + Buffer.from('abc'), + { deadline: deadline }, + (callError2, result) => { + assert(callError2); + // DEADLINE_EXCEEDED means that the server is unreachable + assert( + callError2.code === grpc.status.DEADLINE_EXCEEDED || + callError2.code === grpc.status.UNAVAILABLE + ); + done(); + } + ); + } + ); + } + ); }); it('cancels a bindAsync in progress', done => { - server.bindAsync('localhost:50051', ServerCredentials.createInsecure(), (err, port) => { - assert(err); - assert.match(err.message, /cancelled by unbind/); - done(); - }); + server.bindAsync( + 'localhost:50051', + ServerCredentials.createInsecure(), + (err, port) => { + assert(err); + assert.match(err.message, /cancelled by unbind/); + done(); + } + ); server.unbind('localhost:50051'); }); }); @@ -282,7 +325,7 @@ describe('Server', () => { call.on('data', () => { server.drain(`localhost:${portNumber!}`, 100); }); - call.write({value: 'abc'}); + call.write({ value: 'abc' }); }); });