grpc-js: Run code formatter, fix one lint error

This commit is contained in:
Michael Lumish 2024-02-27 12:51:38 -08:00
parent 513a61a730
commit 6c2bc599e5
19 changed files with 698 additions and 354 deletions

View File

@ -106,7 +106,9 @@ export class BackoffTimeout {
private runTimer(delay: number) {
this.endTime = this.startTime;
this.endTime.setMilliseconds(this.endTime.getMilliseconds() + this.nextDelay);
this.endTime.setMilliseconds(
this.endTime.getMilliseconds() + this.nextDelay
);
clearTimeout(this.timerId);
this.timerId = setTimeout(() => {
this.callback();

View File

@ -183,7 +183,7 @@ export {
ServiceDefinition,
UntypedHandleCall,
UntypedServiceImplementation,
VerifyOptions
VerifyOptions,
};
/**** Server ****/
@ -263,7 +263,12 @@ export { getChannelzServiceDefinition, getChannelzHandlers } from './channelz';
export { addAdminServicesToServer } from './admin';
export { ServiceConfig, LoadBalancingConfig, MethodConfig, RetryPolicy } from './service-config';
export {
ServiceConfig,
LoadBalancingConfig,
MethodConfig,
RetryPolicy,
} from './service-config';
export {
ServerListener,
@ -274,7 +279,7 @@ export {
ResponderBuilder,
ServerInterceptingCallInterface,
ServerInterceptingCall,
ServerInterceptor
ServerInterceptor,
} from './server-interceptors';
import * as experimental from './experimental';

View File

@ -583,7 +583,8 @@ export class InternalChannel {
return;
}
const now = new Date();
const timeSinceLastActivity = now.valueOf() - this.lastActivityTimestamp.valueOf();
const timeSinceLastActivity =
now.valueOf() - this.lastActivityTimestamp.valueOf();
if (timeSinceLastActivity >= this.idleTimeoutMs) {
this.trace(
'Idle timer triggered after ' +
@ -603,7 +604,10 @@ export class InternalChannel {
}
private maybeStartIdleTimer() {
if (this.connectivityState !== ConnectivityState.SHUTDOWN && !this.idleTimer) {
if (
this.connectivityState !== ConnectivityState.SHUTDOWN &&
!this.idleTimer
) {
this.startIdleTimeout(this.idleTimeoutMs);
}
}

View File

@ -198,7 +198,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
keepaliveTime,
errorMessage
) => {
this.onSubchannelStateUpdate(subchannel, previousState, newState, errorMessage);
this.onSubchannelStateUpdate(
subchannel,
previousState,
newState,
errorMessage
);
};
private pickedSubchannelHealthListener: HealthListener = () =>
@ -275,7 +280,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
if (this.stickyTransientFailureMode) {
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({details: `No connection established. Last error: ${this.lastError}`})
new UnavailablePicker({
details: `No connection established. Last error: ${this.lastError}`,
})
);
} else {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
@ -441,7 +448,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private resetSubchannelList() {
for (const child of this.children) {
if (!(this.currentPick && child.subchannel.realSubchannelEquals(this.currentPick))) {
if (
!(
this.currentPick &&
child.subchannel.realSubchannelEquals(this.currentPick)
)
) {
/* The connectivity state listener is the same whether the subchannel
* is in the list of children or it is the currentPick, so if it is in
* both, removing it here would cause problems. In particular, that
@ -523,7 +535,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
exitIdle() {
if (this.currentState === ConnectivityState.IDLE && this.latestAddressList) {
if (
this.currentState === ConnectivityState.IDLE &&
this.latestAddressList
) {
this.connectToAddressList(this.latestAddressList);
}
}

View File

@ -156,7 +156,9 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
) {
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({details: `No connection established. Last error: ${this.lastError}`})
new UnavailablePicker({
details: `No connection established. Last error: ${this.lastError}`,
})
);
} else {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));

View File

@ -145,7 +145,9 @@ export class LoadBalancingCall implements Call {
* metadata generation finished, we shouldn't do anything with
* it. */
if (this.ended) {
this.trace('Credentials metadata generation finished after call ended');
this.trace(
'Credentials metadata generation finished after call ended'
);
return;
}
finalMetadata.merge(credsMetadata);

View File

@ -112,7 +112,18 @@ export function trace(
text: string
): void {
if (isTracerEnabled(tracer)) {
log(severity, new Date().toISOString() + ' | v' + clientVersion + ' ' + pid + ' | ' + tracer + ' | ' + text);
log(
severity,
new Date().toISOString() +
' | v' +
clientVersion +
' ' +
pid +
' | ' +
tracer +
' | ' +
text
);
}
}

View File

@ -335,9 +335,14 @@ class DnsResolver implements Resolver {
if (this.pendingLookupPromise === null) {
if (this.isNextResolutionTimerRunning || this.backoff.isRunning()) {
if (this.isNextResolutionTimerRunning) {
trace('resolution update delayed by "min time between resolutions" rate limit');
trace(
'resolution update delayed by "min time between resolutions" rate limit'
);
} else {
trace('resolution update delayed by backoff timer until ' + this.backoff.getEndTime().toISOString());
trace(
'resolution update delayed by backoff timer until ' +
this.backoff.getEndTime().toISOString()
);
}
this.continueResolving = true;
} else {

View File

@ -223,8 +223,11 @@ export class ResolvingLoadBalancer implements LoadBalancer {
* In that case, the backoff timer callback will call
* updateResolution */
if (this.backoffTimeout.isRunning()) {
trace('requestReresolution delayed by backoff timer until ' + this.backoffTimeout.getEndTime().toISOString());
this.continueResolving = true;
trace(
'requestReresolution delayed by backoff timer until ' +
this.backoffTimeout.getEndTime().toISOString()
);
this.continueResolving = true;
} else {
this.updateResolution();
}

View File

@ -18,9 +18,7 @@
import { EventEmitter } from 'events';
import { Duplex, Readable, Writable } from 'stream';
import {
Status,
} from './constants';
import { Status } from './constants';
import { Deserialize, Serialize } from './make-client';
import { Metadata } from './metadata';
import { ObjectReadable, ObjectWritable } from './object-stream';
@ -56,11 +54,14 @@ export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
ObjectReadable<RequestType> &
ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };
export function serverErrorToStatus(error: ServerErrorResponse | ServerStatusResponse, overrideTrailers?: Metadata | undefined): PartialStatusObject {
export function serverErrorToStatus(
error: ServerErrorResponse | ServerStatusResponse,
overrideTrailers?: Metadata | undefined
): PartialStatusObject {
const status: PartialStatusObject = {
code: Status.UNKNOWN,
details: 'message' in error ? error.message : 'Unknown Error',
metadata: overrideTrailers ?? error.metadata ?? null
metadata: overrideTrailers ?? error.metadata ?? null,
};
if (
@ -154,7 +155,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
private trailingMetadata: Metadata;
private pendingStatus: PartialStatusObject = {
code: Status.OK,
details: 'OK'
details: 'OK',
};
constructor(
@ -224,7 +225,7 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType>
private trailingMetadata: Metadata;
private pendingStatus: PartialStatusObject = {
code: Status.OK,
details: 'OK'
details: 'OK',
};
constructor(

View File

@ -15,19 +15,24 @@
*
*/
import { PartialStatusObject} from "./call-interface";
import { ServerMethodDefinition } from "./make-client";
import { Metadata } from "./metadata";
import { ChannelOptions } from "./channel-options";
import { Handler, ServerErrorResponse } from "./server-call";
import { Deadline } from "./deadline";
import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, DEFAULT_MAX_SEND_MESSAGE_LENGTH, LogVerbosity, Status } from "./constants";
import { PartialStatusObject } from './call-interface';
import { ServerMethodDefinition } from './make-client';
import { Metadata } from './metadata';
import { ChannelOptions } from './channel-options';
import { Handler, ServerErrorResponse } from './server-call';
import { Deadline } from './deadline';
import {
DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
DEFAULT_MAX_SEND_MESSAGE_LENGTH,
LogVerbosity,
Status,
} from './constants';
import * as http2 from 'http2';
import { getErrorMessage } from "./error";
import { getErrorMessage } from './error';
import * as zlib from 'zlib';
import { promisify } from "util";
import { StreamDecoder } from "./stream-decoder";
import { CallEventTracker } from "./transport";
import { promisify } from 'util';
import { StreamDecoder } from './stream-decoder';
import { CallEventTracker } from './transport';
import * as logging from './logging';
const unzip = promisify(zlib.unzip);
@ -96,7 +101,7 @@ export class ServerListenerBuilder {
onReceiveMetadata: this.metadata,
onReceiveMessage: this.message,
onReceiveHalfClose: this.halfClose,
onCancel: this.cancel
onCancel: this.cancel,
};
}
}
@ -109,22 +114,30 @@ export interface InterceptingServerListener {
onCancel(): void;
}
export function isInterceptingServerListener(listener: ServerListener | InterceptingServerListener): listener is InterceptingServerListener {
return listener.onReceiveMetadata !== undefined && listener.onReceiveMetadata.length === 1;
export function isInterceptingServerListener(
listener: ServerListener | InterceptingServerListener
): listener is InterceptingServerListener {
return (
listener.onReceiveMetadata !== undefined &&
listener.onReceiveMetadata.length === 1
);
}
class InterceptingServerListenerImpl implements InterceptingServerListener {
/**
* Once the call is cancelled, ignore all other events.
*/
private cancelled: boolean = false;
private processingMetadata: boolean = false;
private hasPendingMessage: boolean = false;
private cancelled = false;
private processingMetadata = false;
private hasPendingMessage = false;
private pendingMessage: any = null;
private processingMessage: boolean = false;
private hasPendingHalfClose: boolean = false;
private processingMessage = false;
private hasPendingHalfClose = false;
constructor(private listener: FullServerListener, private nextListener: InterceptingServerListener) {}
constructor(
private listener: FullServerListener,
private nextListener: InterceptingServerListener
) {}
private processPendingMessage() {
if (this.hasPendingMessage) {
@ -195,7 +208,6 @@ class InterceptingServerListenerImpl implements InterceptingServerListener {
this.listener.onCancel();
this.nextListener.onCancel();
}
}
export interface StartResponder {
@ -212,7 +224,10 @@ export interface MessageResponder {
}
export interface StatusResponder {
(status: PartialStatusObject, next: (status: PartialStatusObject) => void): void;
(
status: PartialStatusObject,
next: (status: PartialStatusObject) => void
): void;
}
export interface FullResponder {
@ -255,7 +270,7 @@ export class ResponderBuilder {
start: this.start,
sendMetadata: this.metadata,
sendMessage: this.message,
sendStatus: this.status
sendStatus: this.status,
};
}
}
@ -270,11 +285,11 @@ const defaultServerListener: FullServerListener = {
onReceiveHalfClose: next => {
next();
},
onCancel: () => {}
onCancel: () => {},
};
const defaultResponder: FullResponder = {
start: (next) => {
start: next => {
next();
},
sendMetadata: (metadata, next) => {
@ -285,7 +300,7 @@ const defaultResponder: FullResponder = {
},
sendStatus: (status, next) => {
next(status);
}
},
};
export interface ServerInterceptingCallInterface {
@ -321,18 +336,24 @@ export interface ServerInterceptingCallInterface {
export class ServerInterceptingCall implements ServerInterceptingCallInterface {
private responder: FullResponder;
private processingMetadata: boolean = false;
private processingMessage: boolean = false;
private processingMetadata = false;
private processingMessage = false;
private pendingMessage: any = null;
private pendingMessageCallback: (() => void) | null = null;
private pendingStatus: PartialStatusObject | null = null;
constructor(private nextCall: ServerInterceptingCallInterface, responder?: Responder) {
this.responder = {...defaultResponder, ...responder};
constructor(
private nextCall: ServerInterceptingCallInterface,
responder?: Responder
) {
this.responder = { ...defaultResponder, ...responder };
}
private processPendingMessage() {
if (this.pendingMessageCallback) {
this.nextCall.sendMessage(this.pendingMessage, this.pendingMessageCallback);
this.nextCall.sendMessage(
this.pendingMessage,
this.pendingMessageCallback
);
this.pendingMessage = null;
this.pendingMessageCallback = null;
}
@ -347,8 +368,14 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface {
start(listener: InterceptingServerListener): void {
this.responder.start(interceptedListener => {
const fullInterceptedListener: FullServerListener = {...defaultServerListener, ...interceptedListener};
const finalInterceptingListener = new InterceptingServerListenerImpl(fullInterceptedListener, listener);
const fullInterceptedListener: FullServerListener = {
...defaultServerListener,
...interceptedListener,
};
const finalInterceptingListener = new InterceptingServerListenerImpl(
fullInterceptedListener,
listener
);
this.nextCall.start(finalInterceptingListener);
});
}
@ -394,7 +421,10 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface {
}
export interface ServerInterceptor {
(methodDescriptor: ServerMethodDefinition<any, any>, call: ServerInterceptingCallInterface): ServerInterceptingCall;
(
methodDescriptor: ServerMethodDefinition<any, any>,
call: ServerInterceptingCallInterface
): ServerInterceptingCall;
}
interface DeadlineUnitIndexSignature {
@ -438,7 +468,9 @@ interface ReadQueueEntry {
parsedMessage: any;
}
export class BaseServerInterceptingCall implements ServerInterceptingCallInterface {
export class BaseServerInterceptingCall
implements ServerInterceptingCallInterface
{
private listener: InterceptingServerListener | null = null;
private metadata: Metadata;
private deadlineTimer: NodeJS.Timeout | null = null;
@ -449,7 +481,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
private metadataSent = false;
private wantTrailers = false;
private cancelNotified = false;
private incomingEncoding: string = 'identity';
private incomingEncoding = 'identity';
private decoder = new StreamDecoder();
private readQueue: ReadQueueEntry[] = [];
private isReadPending = false;
@ -485,7 +517,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
this.callEventTracker.onCallEnd({
code: Status.CANCELLED,
details: 'Stream closed before sending status',
metadata: null
metadata: null,
});
}
@ -548,7 +580,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
const status: PartialStatusObject = {
code: Status.INTERNAL,
details: `Invalid ${GRPC_TIMEOUT_HEADER} value "${timeoutHeader}"`,
metadata: null
metadata: null,
};
// Wait for the constructor to complete before sending the error.
process.nextTick(() => {
@ -565,11 +597,10 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
const status: PartialStatusObject = {
code: Status.DEADLINE_EXCEEDED,
details: 'Deadline exceeded',
metadata: null
metadata: null,
};
this.sendStatus(status);
}, timeout);
}
private checkCancelled(): boolean {
@ -650,14 +681,19 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
}
const compressed = queueEntry.compressedMessage!.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? this.incomingEncoding : 'identity';
const decompressedMessage = await this.decompressMessage(queueEntry.compressedMessage!, compressedMessageEncoding);
const compressedMessageEncoding = compressed
? this.incomingEncoding
: 'identity';
const decompressedMessage = await this.decompressMessage(
queueEntry.compressedMessage!,
compressedMessageEncoding
);
try {
queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage);
} catch (err) {
this.sendStatus({
code: Status.INTERNAL,
details: `Error deserializing request: ${(err as Error).message}`
details: `Error deserializing request: ${(err as Error).message}`,
});
return;
}
@ -666,7 +702,12 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
}
private maybePushNextMessage() {
if (this.listener && this.isReadPending && this.readQueue.length > 0 && this.readQueue[0].type !== 'COMPRESSED') {
if (
this.listener &&
this.isReadPending &&
this.readQueue.length > 0 &&
this.readQueue[0].type !== 'COMPRESSED'
) {
this.isReadPending = false;
const nextQueueEntry = this.readQueue.shift()!;
if (nextQueueEntry.type === 'READABLE') {
@ -682,23 +723,33 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
if (this.checkCancelled()) {
return;
}
trace('Request to ' + this.handler.path + ' received data frame of size ' + data.length);
trace(
'Request to ' +
this.handler.path +
' received data frame of size ' +
data.length
);
const rawMessages = this.decoder.write(data);
for (const messageBytes of rawMessages) {
this.stream.pause();
if (this.maxReceiveMessageSize !== -1 && messageBytes.length - 5 > this.maxReceiveMessageSize) {
if (
this.maxReceiveMessageSize !== -1 &&
messageBytes.length - 5 > this.maxReceiveMessageSize
) {
this.sendStatus({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${messageBytes.length - 5} vs. ${this.maxReceiveMessageSize})`,
metadata: null
details: `Received message larger than max (${
messageBytes.length - 5
} vs. ${this.maxReceiveMessageSize})`,
metadata: null,
});
return;
}
const queueEntry: ReadQueueEntry = {
type: 'COMPRESSED',
compressedMessage: messageBytes,
parsedMessage: null
parsedMessage: null,
};
this.readQueue.push(queueEntry);
this.decompressAndMaybePush(queueEntry);
@ -709,7 +760,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
this.readQueue.push({
type: 'HALF_CLOSE',
compressedMessage: null,
parsedMessage: null
parsedMessage: null,
});
this.receivedHalfClose = true;
this.maybePushNextMessage();
@ -751,7 +802,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
this.sendStatus({
code: Status.INTERNAL,
details: `Error serializing response: ${getErrorMessage(e)}`,
metadata: null
metadata: null,
});
return;
}
@ -763,18 +814,23 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
this.sendStatus({
code: Status.RESOURCE_EXHAUSTED,
details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`,
metadata: null
metadata: null,
});
return;
}
this.maybeSendMetadata();
trace('Request to ' + this.handler.path + ' sent data frame of size ' + response.length);
trace(
'Request to ' +
this.handler.path +
' sent data frame of size ' +
response.length
);
this.stream.write(response, error => {
if (error) {
this.sendStatus({
code: Status.INTERNAL,
details: `Error writing message: ${getErrorMessage(error)}`,
metadata: null
metadata: null,
});
return;
}
@ -871,16 +927,24 @@ export function getServerInterceptingCall(
handler: Handler<any, any>,
options: ChannelOptions
) {
const methodDefinition: ServerMethodDefinition<any, any> = {
path: handler.path,
requestStream: handler.type === 'clientStream' || handler.type === 'bidi',
responseStream: handler.type === 'serverStream' || handler.type === 'bidi',
requestDeserialize: handler.deserialize,
responseSerialize: handler.serialize
}
const baseCall = new BaseServerInterceptingCall(stream, headers, callEventTracker, handler, options);
return interceptors.reduce((call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => {
return interceptor(methodDefinition, call);
}, baseCall);
responseSerialize: handler.serialize,
};
const baseCall = new BaseServerInterceptingCall(
stream,
headers,
callEventTracker,
handler,
options
);
return interceptors.reduce(
(call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => {
return interceptor(methodDefinition, call);
},
baseCall
);
}

View File

@ -55,7 +55,13 @@ import {
subchannelAddressToString,
stringToSubchannelAddress,
} from './subchannel-address';
import { GrpcUri, combineHostPort, parseUri, splitHostPort, uriToString } from './uri-parser';
import {
GrpcUri,
combineHostPort,
parseUri,
splitHostPort,
uriToString,
} from './uri-parser';
import {
ChannelzCallTracker,
ChannelzChildrenTracker,
@ -70,7 +76,11 @@ import {
unregisterChannelzRef,
} from './channelz';
import { CipherNameAndProtocol, TLSSocket } from 'tls';
import { ServerInterceptingCallInterface, ServerInterceptor, getServerInterceptingCall } from './server-interceptors';
import {
ServerInterceptingCallInterface,
ServerInterceptor,
getServerInterceptingCall,
} from './server-interceptors';
import { PartialStatusObject } from './call-interface';
import { CallEventTracker } from './transport';
@ -103,9 +113,15 @@ function noop(): void {}
* @returns
*/
function deprecate(message: string) {
return function <This, Args extends any[], Return>(target: (this: This, ...args: Args) => Return, context: ClassMethodDecoratorContext<This, (this: This, ...args: Args) => Return>) {
return function <This, Args extends any[], Return>(
target: (this: This, ...args: Args) => Return,
context: ClassMethodDecoratorContext<
This,
(this: This, ...args: Args) => Return
>
) {
return util.deprecate(target, message);
}
};
}
function getUnimplementedStatusResponse(
@ -209,7 +225,7 @@ interface BoundPort {
* that expands to multiple addresses will result in multiple listening
* servers.
*/
listeningServers: Set<AnyHttp2Server>
listeningServers: Set<AnyHttp2Server>;
}
/**
@ -221,11 +237,11 @@ interface Http2ServerInfo {
}
export interface ServerOptions extends ChannelOptions {
interceptors?: ServerInterceptor[]
interceptors?: ServerInterceptor[];
}
export class Server {
private boundPorts: Map<string, BoundPort>= new Map();
private boundPorts: Map<string, BoundPort> = new Map();
private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();
private handlers: Map<string, UntypedHandler> = new Map<
@ -526,10 +542,11 @@ export class Server {
return http2Server;
}
private bindOneAddress(address: SubchannelAddress, boundPortObject: BoundPort): Promise<SingleAddressBindResult> {
this.trace(
'Attempting to bind ' + subchannelAddressToString(address)
);
private bindOneAddress(
address: SubchannelAddress,
boundPortObject: BoundPort
): Promise<SingleAddressBindResult> {
this.trace('Attempting to bind ' + subchannelAddressToString(address));
const http2Server = this.createHttp2Server(boundPortObject.credentials);
return new Promise<SingleAddressBindResult>((resolve, reject) => {
const onError = (err: Error) => {
@ -541,7 +558,7 @@ export class Server {
);
resolve({
port: 'port' in address ? address.port : 1,
error: err.message
error: err.message,
});
};
@ -561,13 +578,15 @@ export class Server {
};
}
const channelzRef = this.registerListenerToChannelz(boundSubchannelAddress);
const channelzRef = this.registerListenerToChannelz(
boundSubchannelAddress
);
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
this.http2Servers.set(http2Server, {
channelzRef: channelzRef,
sessions: new Set()
sessions: new Set(),
});
boundPortObject.listeningServers.add(http2Server);
this.trace(
@ -575,62 +594,86 @@ export class Server {
subchannelAddressToString(boundSubchannelAddress)
);
resolve({
port: 'port' in boundSubchannelAddress
? boundSubchannelAddress.port
: 1
port:
'port' in boundSubchannelAddress ? boundSubchannelAddress.port : 1,
});
http2Server.removeListener('error', onError);
});
});
}
private async bindManyPorts(addressList: SubchannelAddress[], boundPortObject: BoundPort): Promise<BindResult> {
private async bindManyPorts(
addressList: SubchannelAddress[],
boundPortObject: BoundPort
): Promise<BindResult> {
if (addressList.length === 0) {
return {
count: 0,
port: 0,
errors: []
errors: [],
};
}
if (isTcpSubchannelAddress(addressList[0]) && addressList[0].port === 0) {
/* If binding to port 0, first try to bind the first address, then bind
* the rest of the address list to the specific port that it binds. */
const firstAddressResult = await this.bindOneAddress(addressList[0], boundPortObject);
const firstAddressResult = await this.bindOneAddress(
addressList[0],
boundPortObject
);
if (firstAddressResult.error) {
/* If the first address fails to bind, try the same operation starting
* from the second item in the list. */
const restAddressResult = await this.bindManyPorts(addressList.slice(1), boundPortObject);
const restAddressResult = await this.bindManyPorts(
addressList.slice(1),
boundPortObject
);
return {
...restAddressResult,
errors: [firstAddressResult.error, ...restAddressResult.errors]
errors: [firstAddressResult.error, ...restAddressResult.errors],
};
} else {
const restAddresses = addressList.slice(1).map(address => isTcpSubchannelAddress(address) ? {host: address.host, port: firstAddressResult.port} : address)
const restAddressResult = await Promise.all(restAddresses.map(address => this.bindOneAddress(address, boundPortObject)));
const restAddresses = addressList
.slice(1)
.map(address =>
isTcpSubchannelAddress(address)
? { host: address.host, port: firstAddressResult.port }
: address
);
const restAddressResult = await Promise.all(
restAddresses.map(address =>
this.bindOneAddress(address, boundPortObject)
)
);
const allResults = [firstAddressResult, ...restAddressResult];
return {
count: allResults.filter(result => result.error === undefined).length,
port: firstAddressResult.port,
errors: allResults.filter(result => result.error).map(result => result.error!)
errors: allResults
.filter(result => result.error)
.map(result => result.error!),
};
}
} else {
const allResults = await Promise.all(addressList.map(address => this.bindOneAddress(address, boundPortObject)));
const allResults = await Promise.all(
addressList.map(address =>
this.bindOneAddress(address, boundPortObject)
)
);
return {
count: allResults.filter(result => result.error === undefined).length,
port: allResults[0].port,
errors: allResults.filter(result => result.error).map(result => result.error!)
errors: allResults
.filter(result => result.error)
.map(result => result.error!),
};
}
}
private async bindAddressList(addressList: SubchannelAddress[], boundPortObject: BoundPort): Promise<number> {
let bindResult: BindResult;
try {
bindResult = await this.bindManyPorts(addressList, boundPortObject);
} catch (error) {
throw error;
}
private async bindAddressList(
addressList: SubchannelAddress[],
boundPortObject: BoundPort
): Promise<number> {
const bindResult = await this.bindManyPorts(addressList, boundPortObject);
if (bindResult.count > 0) {
if (bindResult.count < addressList.length) {
logging.log(
@ -642,7 +685,9 @@ export class Server {
} else {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
throw new Error(`${errorString} errors: [${bindResult.errors.join(',')}]`);
throw new Error(
`${errorString} errors: [${bindResult.errors.join(',')}]`
);
}
}
@ -660,9 +705,7 @@ export class Server {
...endpointList.map(endpoint => endpoint.addresses)
);
if (addressList.length === 0) {
reject(
new Error(`No addresses resolved for port ${port}`)
);
reject(new Error(`No addresses resolved for port ${port}`));
return;
}
resolve(addressList);
@ -676,7 +719,10 @@ export class Server {
});
}
private async bindPort(port: GrpcUri, boundPortObject: BoundPort): Promise<number> {
private async bindPort(
port: GrpcUri,
boundPortObject: BoundPort
): Promise<number> {
const addressList = await this.resolvePort(port);
if (boundPortObject.cancelled) {
this.completeUnbind(boundPortObject);
@ -691,7 +737,6 @@ export class Server {
}
private normalizePort(port: string): GrpcUri {
const initialPortUri = parseUri(port);
if (initialPortUri === null) {
throw new Error(`Could not parse port "${port}"`);
@ -736,14 +781,20 @@ export class Server {
let boundPortObject = this.boundPorts.get(uriToString(portUri));
if (boundPortObject) {
if (!creds._equals(boundPortObject.credentials)) {
deferredCallback(new Error(`${port} already bound with incompatible credentials`), 0);
deferredCallback(
new Error(`${port} already bound with incompatible credentials`),
0
);
return;
}
/* If that operation has previously been cancelled by an unbind call,
* uncancel it. */
boundPortObject.cancelled = false;
if (boundPortObject.completionPromise) {
boundPortObject.completionPromise.then(portNum => callback(null, portNum), error => callback(error as Error, 0));
boundPortObject.completionPromise.then(
portNum => callback(null, portNum),
error => callback(error as Error, 0)
);
} else {
deferredCallback(null, boundPortObject.portNumber);
}
@ -756,7 +807,7 @@ export class Server {
cancelled: false,
portNumber: 0,
credentials: creds,
listeningServers: new Set()
listeningServers: new Set(),
};
const splitPort = splitHostPort(portUri.path);
const completionPromise = this.bindPort(portUri, boundPortObject);
@ -765,34 +816,42 @@ export class Server {
* bind operation completes and we have a specific port number. Otherwise,
* populate it immediately. */
if (splitPort?.port === 0) {
completionPromise.then(portNum => {
const finalUri: GrpcUri = {
scheme: portUri.scheme,
authority: portUri.authority,
path: combineHostPort({host: splitPort.host, port: portNum})
};
boundPortObject!.mapKey = uriToString(finalUri);
boundPortObject!.completionPromise = null;
boundPortObject!.portNumber = portNum;
this.boundPorts.set(boundPortObject!.mapKey, boundPortObject!);
callback(null, portNum);
}, error => {
callback(error, 0);
})
completionPromise.then(
portNum => {
const finalUri: GrpcUri = {
scheme: portUri.scheme,
authority: portUri.authority,
path: combineHostPort({ host: splitPort.host, port: portNum }),
};
boundPortObject!.mapKey = uriToString(finalUri);
boundPortObject!.completionPromise = null;
boundPortObject!.portNumber = portNum;
this.boundPorts.set(boundPortObject!.mapKey, boundPortObject!);
callback(null, portNum);
},
error => {
callback(error, 0);
}
);
} else {
this.boundPorts.set(boundPortObject.mapKey, boundPortObject);
completionPromise.then(portNum => {
boundPortObject!.completionPromise = null;
boundPortObject!.portNumber = portNum;
callback(null, portNum);
}, error => {
callback(error, 0);
});
completionPromise.then(
portNum => {
boundPortObject!.completionPromise = null;
boundPortObject!.portNumber = portNum;
callback(null, portNum);
},
error => {
callback(error, 0);
}
);
}
}
private closeServer(server: AnyHttp2Server, callback?: () => void) {
this.trace('Closing server with address ' + JSON.stringify(server.address()));
this.trace(
'Closing server with address ' + JSON.stringify(server.address())
);
const serverInfo = this.http2Servers.get(server);
server.close(() => {
if (this.channelzEnabled && serverInfo) {
@ -802,10 +861,12 @@ export class Server {
this.http2Servers.delete(server);
callback?.();
});
}
private closeSession(session: http2.ServerHttp2Session, callback?: () => void) {
private closeSession(
session: http2.ServerHttp2Session,
callback?: () => void
) {
this.trace('Closing session initiated by ' + session.socket?.remoteAddress);
const sessionInfo = this.sessions.get(session);
const closeCallback = () => {
@ -854,7 +915,12 @@ export class Server {
}
const boundPortObject = this.boundPorts.get(uriToString(portUri));
if (boundPortObject) {
this.trace('unbinding ' + boundPortObject.mapKey + ' originally bound as ' + uriToString(boundPortObject.originalUri));
this.trace(
'unbinding ' +
boundPortObject.mapKey +
' originally bound as ' +
uriToString(boundPortObject.originalUri)
);
/* If the bind operation is pending, the cancelled flag will trigger
* the unbind operation later. */
if (boundPortObject.completionPromise) {
@ -964,13 +1030,13 @@ export class Server {
/**
* @deprecated No longer needed as of version 1.10.x
*/
@deprecate('Calling start() is no longer necessary. It can be safely omitted.')
@deprecate(
'Calling start() is no longer necessary. It can be safely omitted.'
)
start(): void {
if (
this.http2Servers.size === 0 ||
[...this.http2Servers.keys()].every(
server => !server.listening
)
[...this.http2Servers.keys()].every(server => !server.listening)
) {
throw new Error('server must be bound in order to start');
}
@ -1090,9 +1156,9 @@ export class Server {
'grpc-message': err.details,
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
...err.metadata?.toHttp2Headers()
...err.metadata?.toHttp2Headers(),
};
stream.respond(trailersToSend, {endStream: true});
stream.respond(trailersToSend, { endStream: true });
if (this.channelzEnabled) {
this.callTracker.addCallFailed();
@ -1129,7 +1195,7 @@ export class Server {
return;
}
let callEventTracker: CallEventTracker = {
const callEventTracker: CallEventTracker = {
addMessageSent: () => {
if (channelzSessionInfo) {
channelzSessionInfo.messagesSent += 1;
@ -1157,10 +1223,17 @@ export class Server {
channelzSessionInfo.streamTracker.addCallFailed();
}
}
}
}
},
};
const call = getServerInterceptingCall(this.interceptors, stream, headers, callEventTracker, handler, this.options);
const call = getServerInterceptingCall(
this.interceptors,
stream,
headers,
callEventTracker,
handler,
this.options
);
if (!this._runHandlerForCall(call, handler)) {
this.callTracker.addCallFailed();
@ -1193,7 +1266,14 @@ export class Server {
return;
}
const call = getServerInterceptingCall(this.interceptors, stream, headers, null, handler, this.options);
const call = getServerInterceptingCall(
this.interceptors,
stream,
headers,
null,
handler,
this.options
);
if (!this._runHandlerForCall(call, handler)) {
call.sendStatus({
@ -1207,25 +1287,15 @@ export class Server {
call: ServerInterceptingCallInterface,
handler: Handler<any, any>
): boolean {
const { type } = handler;
if (type === 'unary') {
handleUnary(call, handler as UntypedUnaryHandler);
} else if (type === 'clientStream') {
handleClientStreaming(
call,
handler as UntypedClientStreamingHandler
);
handleClientStreaming(call, handler as UntypedClientStreamingHandler);
} else if (type === 'serverStream') {
handleServerStreaming(
call,
handler as UntypedServerStreamingHandler
);
handleServerStreaming(call, handler as UntypedServerStreamingHandler);
} else if (type === 'bidi') {
handleBidiStreaming(
call,
handler as UntypedBidiStreamingHandler
);
handleBidiStreaming(call, handler as UntypedBidiStreamingHandler);
} else {
return false;
}
@ -1257,7 +1327,6 @@ export class Server {
http2Server.on('stream', handler.bind(this));
http2Server.on('session', session => {
const channelzRef = registerChannelzSocket(
session.socket.remoteAddress ?? 'unknown',
this.getChannelzSessionInfoGetter(session),
@ -1388,7 +1457,7 @@ async function handleUnary<RequestType, ResponseType>(
call.sendStatus({
code: Status.OK,
details: 'OK',
metadata: trailer ?? null
metadata: trailer ?? null,
});
});
}
@ -1405,7 +1474,7 @@ async function handleUnary<RequestType, ResponseType>(
call.sendStatus({
code: Status.UNIMPLEMENTED,
details: `Received a second request message for server streaming method ${handler.path}`,
metadata: null
metadata: null,
});
return;
}
@ -1417,18 +1486,25 @@ async function handleUnary<RequestType, ResponseType>(
call.sendStatus({
code: Status.UNIMPLEMENTED,
details: `Received no request message for server streaming method ${handler.path}`,
metadata: null
metadata: null,
});
return;
}
stream = new ServerWritableStreamImpl(handler.path, call, requestMetadata, requestMessage);
stream = new ServerWritableStreamImpl(
handler.path,
call,
requestMetadata,
requestMessage
);
try {
handler.func(stream, respond);
} catch (err) {
call.sendStatus({
code: Status.UNKNOWN,
details: `Server method handler threw error ${(err as Error).message}`,
metadata: null
details: `Server method handler threw error ${
(err as Error).message
}`,
metadata: null,
});
}
},
@ -1461,7 +1537,7 @@ function handleClientStreaming<RequestType, ResponseType>(
call.sendStatus({
code: Status.OK,
details: 'OK',
metadata: trailer ?? null
metadata: trailer ?? null,
});
});
}
@ -1474,8 +1550,10 @@ function handleClientStreaming<RequestType, ResponseType>(
} catch (err) {
call.sendStatus({
code: Status.UNKNOWN,
details: `Server method handler threw error ${(err as Error).message}`,
metadata: null
details: `Server method handler threw error ${
(err as Error).message
}`,
metadata: null,
});
}
},
@ -1513,7 +1591,7 @@ function handleServerStreaming<RequestType, ResponseType>(
call.sendStatus({
code: Status.UNIMPLEMENTED,
details: `Received a second request message for server streaming method ${handler.path}`,
metadata: null
metadata: null,
});
return;
}
@ -1525,18 +1603,25 @@ function handleServerStreaming<RequestType, ResponseType>(
call.sendStatus({
code: Status.UNIMPLEMENTED,
details: `Received no request message for server streaming method ${handler.path}`,
metadata: null
metadata: null,
});
return;
}
stream = new ServerWritableStreamImpl(handler.path, call, requestMetadata, requestMessage);
stream = new ServerWritableStreamImpl(
handler.path,
call,
requestMetadata,
requestMessage
);
try {
handler.func(stream);
} catch (err) {
call.sendStatus({
code: Status.UNKNOWN,
details: `Server method handler threw error ${(err as Error).message}`,
metadata: null
details: `Server method handler threw error ${
(err as Error).message
}`,
metadata: null,
});
}
},
@ -1564,8 +1649,10 @@ function handleBidiStreaming<RequestType, ResponseType>(
} catch (err) {
call.sendStatus({
code: Status.UNKNOWN,
details: `Server method handler threw error ${(err as Error).message}`,
metadata: null
details: `Server method handler threw error ${
(err as Error).message
}`,
metadata: null,
});
}
},

View File

@ -356,17 +356,23 @@ export function validateRetryThrottling(obj: any): RetryThrottling {
function validateLoadBalancingConfig(obj: any): LoadBalancingConfig {
if (!(typeof obj === 'object' && obj !== null)) {
throw new Error(`Invalid loadBalancingConfig: unexpected type ${typeof obj}`);
throw new Error(
`Invalid loadBalancingConfig: unexpected type ${typeof obj}`
);
}
const keys = Object.keys(obj);
if (keys.length > 1) {
throw new Error(`Invalid loadBalancingConfig: unexpected multiple keys ${keys}`);
throw new Error(
`Invalid loadBalancingConfig: unexpected multiple keys ${keys}`
);
}
if (keys.length === 0) {
throw new Error('Invalid loadBalancingConfig: load balancing policy name required');
throw new Error(
'Invalid loadBalancingConfig: load balancing policy name required'
);
}
return {
[keys[0]]: obj[keys[0]]
[keys[0]]: obj[keys[0]],
};
}
@ -385,7 +391,6 @@ export function validateServiceConfig(obj: any): ServiceConfig {
if ('loadBalancingConfig' in obj) {
if (Array.isArray(obj.loadBalancingConfig)) {
for (const config of obj.loadBalancingConfig) {
result.loadBalancingConfig.push(validateLoadBalancingConfig(config));
}
} else {

View File

@ -205,7 +205,12 @@ class Http2Transport implements Transport {
) {
tooManyPings = true;
}
this.trace('connection closed by GOAWAY with code ' + errorCode + ' and data ' + opaqueData?.toString());
this.trace(
'connection closed by GOAWAY with code ' +
errorCode +
' and data ' +
opaqueData?.toString()
);
this.reportDisconnectToOwner(tooManyPings);
}
);

View File

@ -129,7 +129,10 @@ export class TestClient {
this.client.echo({}, callback);
}
sendRequestWithMetadata(metadata: grpc.Metadata, callback: (error?: grpc.ServiceError) => void) {
sendRequestWithMetadata(
metadata: grpc.Metadata,
callback: (error?: grpc.ServiceError) => void
) {
this.client.echo({}, metadata, callback);
}

View File

@ -28,7 +28,7 @@ import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
*/
interface TestCase {
name: string;
input: object,
input: object;
output?: object;
error?: RegExp;
}
@ -40,52 +40,52 @@ interface TestCase {
* Note: some tests have an expected output that is different from the output,
* but all non-error tests additionally verify that parsing the output again
* produces the same output. */
const allTestCases: {[lbPolicyName: string]: TestCase[]} = {
const allTestCases: { [lbPolicyName: string]: TestCase[] } = {
pick_first: [
{
name: 'no fields set',
input: {},
output: {
shuffleAddressList: false
}
shuffleAddressList: false,
},
},
{
name: 'shuffleAddressList set',
input: {
shuffleAddressList: true
}
}
shuffleAddressList: true,
},
},
],
round_robin: [
{
name: 'no fields set',
input: {}
}
input: {},
},
],
outlier_detection: [
{
name: 'only required fields set',
input: {
child_policy: [{round_robin: {}}]
child_policy: [{ round_robin: {} }],
},
output: {
interval: {
seconds: 10,
nanos: 0
nanos: 0,
},
base_ejection_time: {
seconds: 30,
nanos: 0
nanos: 0,
},
max_ejection_time: {
seconds: 300,
nanos: 0
nanos: 0,
},
max_ejection_percent: 10,
success_rate_ejection: undefined,
failure_percentage_ejection: undefined,
child_policy: [{round_robin: {}}]
}
child_policy: [{ round_robin: {} }],
},
},
{
name: 'all optional fields undefined',
@ -96,53 +96,53 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = {
max_ejection_percent: undefined,
success_rate_ejection: undefined,
failure_percentage_ejection: undefined,
child_policy: [{round_robin: {}}]
child_policy: [{ round_robin: {} }],
},
output: {
interval: {
seconds: 10,
nanos: 0
nanos: 0,
},
base_ejection_time: {
seconds: 30,
nanos: 0
nanos: 0,
},
max_ejection_time: {
seconds: 300,
nanos: 0
nanos: 0,
},
max_ejection_percent: 10,
success_rate_ejection: undefined,
failure_percentage_ejection: undefined,
child_policy: [{round_robin: {}}]
}
child_policy: [{ round_robin: {} }],
},
},
{
name: 'empty ejection configs',
input: {
success_rate_ejection: {},
failure_percentage_ejection: {},
child_policy: [{round_robin: {}}]
child_policy: [{ round_robin: {} }],
},
output: {
interval: {
seconds: 10,
nanos: 0
nanos: 0,
},
base_ejection_time: {
seconds: 30,
nanos: 0
nanos: 0,
},
max_ejection_time: {
seconds: 300,
nanos: 0
nanos: 0,
},
max_ejection_percent: 10,
success_rate_ejection: {
stdev_factor: 1900,
enforcement_percentage: 100,
minimum_hosts: 5,
request_volume: 100
request_volume: 100,
},
failure_percentage_ejection: {
threshold: 85,
@ -150,30 +150,30 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = {
minimum_hosts: 5,
request_volume: 50,
},
child_policy: [{round_robin: {}}]
}
child_policy: [{ round_robin: {} }],
},
},
{
name: 'all fields populated',
input: {
interval: {
seconds: 20,
nanos: 0
nanos: 0,
},
base_ejection_time: {
seconds: 40,
nanos: 0
nanos: 0,
},
max_ejection_time: {
seconds: 400,
nanos: 0
nanos: 0,
},
max_ejection_percent: 20,
success_rate_ejection: {
stdev_factor: 1800,
enforcement_percentage: 90,
minimum_hosts: 4,
request_volume: 200
request_volume: 200,
},
failure_percentage_ejection: {
threshold: 95,
@ -181,29 +181,34 @@ const allTestCases: {[lbPolicyName: string]: TestCase[]} = {
minimum_hosts: 4,
request_volume: 60,
},
child_policy: [{round_robin: {}}]
}
}
]
}
child_policy: [{ round_robin: {} }],
},
},
],
};
describe('Load balancing policy config parsing', () => {
for (const [lbPolicyName, testCases] of Object.entries(allTestCases)) {
describe(lbPolicyName, () => {
for (const testCase of testCases) {
it(testCase.name, () => {
const lbConfigInput = {[lbPolicyName]: testCase.input};
const lbConfigInput = { [lbPolicyName]: testCase.input };
if (testCase.error) {
assert.throws(() => {
parseLoadBalancingConfig(lbConfigInput);
}, testCase.error);
} else {
const expectedOutput = testCase.output ?? testCase.input;
const parsedJson = parseLoadBalancingConfig(lbConfigInput).toJsonObject();
assert.deepStrictEqual(parsedJson, {[lbPolicyName]: expectedOutput});
const parsedJson =
parseLoadBalancingConfig(lbConfigInput).toJsonObject();
assert.deepStrictEqual(parsedJson, {
[lbPolicyName]: expectedOutput,
});
// Test idempotency
assert.deepStrictEqual(parseLoadBalancingConfig(parsedJson).toJsonObject(), parsedJson);
assert.deepStrictEqual(
parseLoadBalancingConfig(parsedJson).toJsonObject(),
parsedJson
);
}
});
}

View File

@ -561,28 +561,43 @@ describe('pick_first load balancing policy', () => {
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.CONNECTING, ConnectivityState.TRANSIENT_FAILURE],
err => setImmediate(() => {
assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount);
done(err);
})
err =>
setImmediate(() => {
assert.strictEqual(
reresolutionRequestCount,
targetReresolutionRequestCount
);
done(err);
})
),
requestReresolution: () => {
reresolutionRequestCount += 1;
}
},
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
config
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 3 }]}], config);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 3 }] }],
config
);
process.nextTick(() => {
subchannels[2].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
subchannels[2].transitionToState(
ConnectivityState.TRANSIENT_FAILURE
);
});
});
});
@ -606,22 +621,35 @@ describe('pick_first load balancing policy', () => {
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.TRANSIENT_FAILURE],
err => setImmediate(() => {
assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount);
done(err);
})
err =>
setImmediate(() => {
assert.strictEqual(
reresolutionRequestCount,
targetReresolutionRequestCount
);
done(err);
})
),
requestReresolution: () => {
reresolutionRequestCount += 1;
}
},
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
);
process.nextTick(() => {
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
config
);
process.nextTick(() => {
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
config
);
});
});
});
@ -639,13 +667,20 @@ describe('pick_first load balancing policy', () => {
return subchannel;
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.READY, ConnectivityState.IDLE, ConnectivityState.READY],
[
ConnectivityState.READY,
ConnectivityState.IDLE,
ConnectivityState.READY,
],
done
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
process.nextTick(() => {

View File

@ -26,23 +26,28 @@ const echoService = loadProtoFile(protoFile)
const AUTH_HEADER_KEY = 'auth';
const AUTH_HEADER_ALLOWED_VALUE = 'allowed';
const testAuthInterceptor: grpc.ServerInterceptor = (methodDescriptor, call) => {
const testAuthInterceptor: grpc.ServerInterceptor = (
methodDescriptor,
call
) => {
return new grpc.ServerInterceptingCall(call, {
start: next => {
const authListener: grpc.ServerListener = {
onReceiveMetadata: (metadata, mdNext) => {
if (metadata.get(AUTH_HEADER_KEY)?.[0] !== AUTH_HEADER_ALLOWED_VALUE) {
if (
metadata.get(AUTH_HEADER_KEY)?.[0] !== AUTH_HEADER_ALLOWED_VALUE
) {
call.sendStatus({
code: grpc.status.UNAUTHENTICATED,
details: 'Auth metadata not correct'
details: 'Auth metadata not correct',
});
} else {
mdNext(metadata);
}
}
},
};
next(authListener);
}
},
});
};
@ -52,7 +57,7 @@ let eventCounts = {
receiveHalfClose: 0,
sendMetadata: 0,
sendMessage: 0,
sendStatus: 0
sendStatus: 0,
};
function resetEventCounts() {
@ -62,7 +67,7 @@ function resetEventCounts() {
receiveHalfClose: 0,
sendMetadata: 0,
sendMessage: 0,
sendStatus: 0
sendStatus: 0,
};
}
@ -72,7 +77,10 @@ function resetEventCounts() {
* @param methodDescription
* @param call
*/
const testLoggingInterceptor: grpc.ServerInterceptor = (methodDescription, call) => {
const testLoggingInterceptor: grpc.ServerInterceptor = (
methodDescription,
call
) => {
return new grpc.ServerInterceptingCall(call, {
start: next => {
next({
@ -87,7 +95,7 @@ const testLoggingInterceptor: grpc.ServerInterceptor = (methodDescription, call)
onReceiveHalfClose: hcNext => {
eventCounts.receiveHalfClose += 1;
hcNext();
}
},
});
},
sendMetadata: (metadata, mdNext) => {
@ -101,21 +109,24 @@ const testLoggingInterceptor: grpc.ServerInterceptor = (methodDescription, call)
sendStatus: (status, statusNext) => {
eventCounts.sendStatus += 1;
statusNext(status);
}
},
});
};
const testHeaderInjectionInterceptor: grpc.ServerInterceptor = (methodDescriptor, call) => {
const testHeaderInjectionInterceptor: grpc.ServerInterceptor = (
methodDescriptor,
call
) => {
return new grpc.ServerInterceptingCall(call, {
start: next => {
const authListener: grpc.ServerListener = {
onReceiveMetadata: (metadata, mdNext) => {
metadata.set('injected-header', 'present');
mdNext(metadata);
}
},
};
next(authListener);
}
},
});
};
@ -126,22 +137,29 @@ describe('Server interceptors', () => {
/* Tests that an interceptor can entirely prevent the handler from being
* invoked, based on the contents of the metadata. */
before(done => {
server = new grpc.Server({interceptors: [testAuthInterceptor]});
server = new grpc.Server({ interceptors: [testAuthInterceptor] });
server.addService(echoService.service, {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
// A test will fail if a request makes it to the handler without the correct auth header
assert.strictEqual(call.metadata.get(AUTH_HEADER_KEY)?.[0], AUTH_HEADER_ALLOWED_VALUE);
assert.strictEqual(
call.metadata.get(AUTH_HEADER_KEY)?.[0],
AUTH_HEADER_ALLOWED_VALUE
);
callback(null, call.request);
},
});
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
assert.ifError(error);
client = new TestClient(port, false);
done();
});
server.bindAsync(
'localhost:0',
grpc.ServerCredentials.createInsecure(),
(error, port) => {
assert.ifError(error);
client = new TestClient(port, false);
done();
}
);
});
after(done => {
client.close();
@ -165,7 +183,7 @@ describe('Server interceptors', () => {
let server: grpc.Server;
let client: TestClient;
before(done => {
server = new grpc.Server({interceptors: [testLoggingInterceptor]});
server = new grpc.Server({ interceptors: [testLoggingInterceptor] });
server.addService(echoService.service, {
echo: (
call: grpc.ServerUnaryCall<any, any>,
@ -175,11 +193,15 @@ describe('Server interceptors', () => {
callback(null, call.request);
},
});
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
assert.ifError(error);
client = new TestClient(port, false);
done();
});
server.bindAsync(
'localhost:0',
grpc.ServerCredentials.createInsecure(),
(error, port) => {
assert.ifError(error);
client = new TestClient(port, false);
done();
}
);
});
after(done => {
client.close();
@ -197,7 +219,7 @@ describe('Server interceptors', () => {
receiveHalfClose: 1,
sendMetadata: 1,
sendMessage: 1,
sendStatus: 1
sendStatus: 1,
});
done();
});
@ -207,21 +229,30 @@ describe('Server interceptors', () => {
let server: grpc.Server;
let client: TestClient;
before(done => {
server = new grpc.Server({interceptors: [testHeaderInjectionInterceptor]});
server = new grpc.Server({
interceptors: [testHeaderInjectionInterceptor],
});
server.addService(echoService.service, {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
assert.strictEqual(call.metadata.get('injected-header')?.[0], 'present');
assert.strictEqual(
call.metadata.get('injected-header')?.[0],
'present'
);
callback(null, call.request);
},
});
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
assert.ifError(error);
client = new TestClient(port, false);
done();
});
server.bindAsync(
'localhost:0',
grpc.ServerCredentials.createInsecure(),
(error, port) => {
assert.ifError(error);
client = new TestClient(port, false);
done();
}
);
});
after(done => {
client.close();
@ -235,23 +266,39 @@ describe('Server interceptors', () => {
let server: grpc.Server;
let client: TestClient;
before(done => {
server = new grpc.Server({interceptors: [testAuthInterceptor, testLoggingInterceptor, testHeaderInjectionInterceptor]});
server = new grpc.Server({
interceptors: [
testAuthInterceptor,
testLoggingInterceptor,
testHeaderInjectionInterceptor,
],
});
server.addService(echoService.service, {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
assert.strictEqual(call.metadata.get(AUTH_HEADER_KEY)?.[0], AUTH_HEADER_ALLOWED_VALUE);
assert.strictEqual(call.metadata.get('injected-header')?.[0], 'present');
assert.strictEqual(
call.metadata.get(AUTH_HEADER_KEY)?.[0],
AUTH_HEADER_ALLOWED_VALUE
);
assert.strictEqual(
call.metadata.get('injected-header')?.[0],
'present'
);
call.sendMetadata(new grpc.Metadata());
callback(null, call.request);
},
});
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
assert.ifError(error);
client = new TestClient(port, false);
done();
});
server.bindAsync(
'localhost:0',
grpc.ServerCredentials.createInsecure(),
(error, port) => {
assert.ifError(error);
client = new TestClient(port, false);
done();
}
);
});
after(done => {
client.close();
@ -271,7 +318,7 @@ describe('Server interceptors', () => {
receiveHalfClose: 0,
sendMetadata: 0,
sendMessage: 0,
sendStatus: 0
sendStatus: 0,
});
done();
});
@ -287,7 +334,7 @@ describe('Server interceptors', () => {
receiveHalfClose: 1,
sendMetadata: 1,
sendMessage: 1,
sendStatus: 1
sendStatus: 1,
});
done();
});

View File

@ -149,14 +149,22 @@ describe('Server', () => {
});
it('succeeds when called with an already bound port', done => {
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
server.bindAsync(`localhost:${port}`, ServerCredentials.createInsecure(), (err2, port2) => {
assert.ifError(err2);
assert.strictEqual(port, port2);
done();
});
});
server.bindAsync(
'localhost:0',
ServerCredentials.createInsecure(),
(err, port) => {
assert.ifError(err);
server.bindAsync(
`localhost:${port}`,
ServerCredentials.createInsecure(),
(err2, port2) => {
assert.ifError(err2);
assert.strictEqual(port, port2);
done();
}
);
}
);
});
it('fails when called on a bound port with different credentials', done => {
@ -165,15 +173,19 @@ describe('Server', () => {
[{ private_key: key, cert_chain: cert }],
true
);
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
server.bindAsync(`localhost:${port}`, secureCreds, (err2, port2) => {
assert(err2 !== null);
assert.match(err2.message, /credentials/);
done();
})
});
})
server.bindAsync(
'localhost:0',
ServerCredentials.createInsecure(),
(err, port) => {
assert.ifError(err);
server.bindAsync(`localhost:${port}`, secureCreds, (err2, port2) => {
assert(err2 !== null);
assert.match(err2.message, /credentials/);
done();
});
}
);
});
});
describe('unbind', () => {
@ -188,42 +200,73 @@ describe('Server', () => {
assert.throws(() => {
server.unbind('localhost:0');
}, /port 0/);
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
assert.notStrictEqual(port, 0);
assert.throws(() => {
server.unbind('localhost:0');
}, /port 0/);
done();
})
server.bindAsync(
'localhost:0',
ServerCredentials.createInsecure(),
(err, port) => {
assert.ifError(err);
assert.notStrictEqual(port, 0);
assert.throws(() => {
server.unbind('localhost:0');
}, /port 0/);
done();
}
);
});
it('successfully unbinds a bound ephemeral port', done => {
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => {
client = new grpc.Client(`localhost:${port}`, grpc.credentials.createInsecure());
client.makeUnaryRequest('/math.Math/Div', x => x, x => x, Buffer.from('abc'), (callError1, result) => {
assert(callError1);
// UNIMPLEMENTED means that the request reached the call handling code
assert.strictEqual(callError1.code, grpc.status.UNIMPLEMENTED);
server.unbind(`localhost:${port}`);
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
client!.makeUnaryRequest('/math.Math/Div', x => x, x => x, Buffer.from('abc'), {deadline: deadline}, (callError2, result) => {
assert(callError2);
// DEADLINE_EXCEEDED means that the server is unreachable
assert(callError2.code === grpc.status.DEADLINE_EXCEEDED || callError2.code === grpc.status.UNAVAILABLE);
done();
});
});
})
server.bindAsync(
'localhost:0',
ServerCredentials.createInsecure(),
(err, port) => {
client = new grpc.Client(
`localhost:${port}`,
grpc.credentials.createInsecure()
);
client.makeUnaryRequest(
'/math.Math/Div',
x => x,
x => x,
Buffer.from('abc'),
(callError1, result) => {
assert(callError1);
// UNIMPLEMENTED means that the request reached the call handling code
assert.strictEqual(callError1.code, grpc.status.UNIMPLEMENTED);
server.unbind(`localhost:${port}`);
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
client!.makeUnaryRequest(
'/math.Math/Div',
x => x,
x => x,
Buffer.from('abc'),
{ deadline: deadline },
(callError2, result) => {
assert(callError2);
// DEADLINE_EXCEEDED means that the server is unreachable
assert(
callError2.code === grpc.status.DEADLINE_EXCEEDED ||
callError2.code === grpc.status.UNAVAILABLE
);
done();
}
);
}
);
}
);
});
it('cancels a bindAsync in progress', done => {
server.bindAsync('localhost:50051', ServerCredentials.createInsecure(), (err, port) => {
assert(err);
assert.match(err.message, /cancelled by unbind/);
done();
});
server.bindAsync(
'localhost:50051',
ServerCredentials.createInsecure(),
(err, port) => {
assert(err);
assert.match(err.message, /cancelled by unbind/);
done();
}
);
server.unbind('localhost:50051');
});
});
@ -282,7 +325,7 @@ describe('Server', () => {
call.on('data', () => {
server.drain(`localhost:${portNumber!}`, 100);
});
call.write({value: 'abc'});
call.write({ value: 'abc' });
});
});