Lint fixes

This commit is contained in:
murgatroid99 2019-08-28 18:03:57 -07:00
parent 5aef347fb9
commit b4d848865d
18 changed files with 736 additions and 355 deletions

View File

@ -22,8 +22,8 @@ const BACKOFF_JITTER = 0.2;
/**
* Get a number uniformly at random in the range [min, max)
* @param min
* @param max
* @param min
* @param max
*/
function uniformRandom(min: number, max: number) {
return Math.random() * (max - min) + min;
@ -43,7 +43,7 @@ export class BackoffTimeout {
private jitter: number = BACKOFF_JITTER;
private nextDelay: number;
private timerId: NodeJS.Timer;
private running: boolean = false;
private running = false;
constructor(private callback: () => void, options?: BackoffOptions) {
if (options) {
@ -74,9 +74,13 @@ export class BackoffTimeout {
this.callback();
this.running = false;
}, this.nextDelay);
const nextBackoff = Math.min(this.nextDelay * this.multiplier, this.maxDelay);
const nextBackoff = Math.min(
this.nextDelay * this.multiplier,
this.maxDelay
);
const jitterMagnitude = nextBackoff * this.jitter;
this.nextDelay = nextBackoff + uniformRandom(-jitterMagnitude, jitterMagnitude);
this.nextDelay =
nextBackoff + uniformRandom(-jitterMagnitude, jitterMagnitude);
}
/**
@ -98,4 +102,4 @@ export class BackoffTimeout {
isRunning() {
return this.running;
}
}
}

View File

@ -96,7 +96,9 @@ class ComposedCallCredentials extends CallCredentials {
return true;
}
if (other instanceof ComposedCallCredentials) {
return this.creds.every((value, index) => value._equals(other.creds[index]));
return this.creds.every((value, index) =>
value._equals(other.creds[index])
);
} else {
return false;
}
@ -134,7 +136,7 @@ class SingleCallCredentials extends CallCredentials {
return false;
}
}
}
}
class EmptyCallCredentials extends CallCredentials {
generateMetadata(options: CallMetadataOptions): Promise<Metadata> {

View File

@ -140,7 +140,12 @@ export abstract class ChannelCredentials {
'Certificate chain must be given with accompanying private key'
);
}
return new SecureChannelCredentialsImpl(rootCerts || null, privateKey || null, certChain || null, verifyOptions || {});
return new SecureChannelCredentialsImpl(
rootCerts || null,
privateKey || null,
certChain || null,
verifyOptions || {}
);
}
/**
@ -224,7 +229,10 @@ class SecureChannelCredentialsImpl extends ChannelCredentials {
if (!bufferOrNullEqual(this.certChain, other.certChain)) {
return false;
}
return this.verifyOptions.checkServerIdentity === other.verifyOptions.checkServerIdentity;
return (
this.verifyOptions.checkServerIdentity ===
other.verifyOptions.checkServerIdentity
);
} else {
return false;
}
@ -232,12 +240,20 @@ class SecureChannelCredentialsImpl extends ChannelCredentials {
}
class ComposedChannelCredentialsImpl extends ChannelCredentials {
constructor (private channelCredentials: SecureChannelCredentialsImpl, callCreds: CallCredentials) {
constructor(
private channelCredentials: SecureChannelCredentialsImpl,
callCreds: CallCredentials
) {
super(callCreds);
}
compose(callCredentials: CallCredentials) {
const combinedCallCredentials = this.callCredentials.compose(callCredentials);
return new ComposedChannelCredentialsImpl(this.channelCredentials, combinedCallCredentials);
const combinedCallCredentials = this.callCredentials.compose(
callCredentials
);
return new ComposedChannelCredentialsImpl(
this.channelCredentials,
combinedCallCredentials
);
}
_getConnectionOptions(): ConnectionOptions | null {
@ -251,7 +267,10 @@ class ComposedChannelCredentialsImpl extends ChannelCredentials {
return true;
}
if (other instanceof ComposedChannelCredentialsImpl) {
return this.channelCredentials._equals(other.channelCredentials) && this.callCredentials._equals(other.callCredentials);
return (
this.channelCredentials._equals(other.channelCredentials) &&
this.callCredentials._equals(other.callCredentials)
);
} else {
return false;
}

View File

@ -41,13 +41,16 @@ export const recognizedOptions = {
'grpc.keepalive_timeout_ms': true,
};
export function channelOptionsEqual(options1: ChannelOptions, options2: ChannelOptions) {
export function channelOptionsEqual(
options1: ChannelOptions,
options2: ChannelOptions
) {
const keys1 = Object.keys(options1).sort();
const keys2 = Object.keys(options2).sort();
if (keys1.length !== keys2.length) {
return false;
}
for (let i = 0; i < keys1.length; i+=1) {
for (let i = 0; i < keys1.length; i += 1) {
if (keys1[i] !== keys2[i]) {
return false;
}

View File

@ -15,23 +15,28 @@
*
*/
import { Deadline, Call, Http2CallStream, CallStreamOptions } from "./call-stream";
import { ChannelCredentials } from "./channel-credentials";
import { ChannelOptions } from "./channel-options";
import { ResolvingLoadBalancer } from "./resolving-load-balancer";
import { SubchannelPool, getSubchannelPool } from "./subchannel-pool";
import { ChannelControlHelper } from "./load-balancer";
import { UnavailablePicker, Picker, PickResultType } from "./picker";
import { Metadata } from "./metadata";
import { Status } from "./constants";
import { FilterStackFactory } from "./filter-stack";
import { CallCredentialsFilterFactory } from "./call-credentials-filter";
import { DeadlineFilterFactory } from "./deadline-filter";
import { MetadataStatusFilterFactory } from "./metadata-status-filter";
import { CompressionFilterFactory } from "./compression-filter";
import { getDefaultAuthority } from "./resolver";
import { LoadBalancingConfig } from "./load-balancing-config";
import { ServiceConfig } from "./service-config";
import {
Deadline,
Call,
Http2CallStream,
CallStreamOptions,
} from './call-stream';
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import { ResolvingLoadBalancer } from './resolving-load-balancer';
import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, PickResultType } from './picker';
import { Metadata } from './metadata';
import { Status } from './constants';
import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
import { MetadataStatusFilterFactory } from './metadata-status-filter';
import { CompressionFilterFactory } from './compression-filter';
import { getDefaultAuthority } from './resolver';
import { LoadBalancingConfig } from './load-balancing-config';
import { ServiceConfig } from './service-config';
export enum ConnectivityState {
CONNECTING,
@ -111,37 +116,58 @@ export class ChannelImplementation implements Channel {
private subchannelPool: SubchannelPool;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private currentPicker: Picker = new UnavailablePicker();
private pickQueue: {callStream: Http2CallStream, callMetadata: Metadata}[] = [];
private pickQueue: Array<{
callStream: Http2CallStream;
callMetadata: Metadata;
}> = [];
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
private defaultAuthority: string;
private filterStackFactory: FilterStackFactory;
constructor(private target: string, private readonly credentials: ChannelCredentials, private readonly options: ChannelOptions) {
constructor(
private target: string,
private readonly credentials: ChannelCredentials,
private readonly options: ChannelOptions
) {
// TODO: check channel arg for getting a private pool
this.subchannelPool = getSubchannelPool(true);
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => {
return this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials);
createSubchannel: (
subchannelAddress: string,
subchannelArgs: ChannelOptions
) => {
return this.subchannelPool.getOrCreateSubchannel(
this.target,
subchannelAddress,
Object.assign({}, this.options, subchannelArgs),
this.credentials
);
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.currentPicker = picker;
const queueCopy = this.pickQueue.slice();
this.pickQueue = [];
for (const {callStream, callMetadata} of queueCopy) {
for (const { callStream, callMetadata } of queueCopy) {
this.tryPick(callStream, callMetadata);
}
this.updateState(connectivityState);
},
requestReresolution: () => {
// This should never be called.
throw new Error('Resolving load balancer should never call requestReresolution');
}
throw new Error(
'Resolving load balancer should never call requestReresolution'
);
},
};
// TODO: check channel arg for default service config
const defaultServiceConfig: ServiceConfig = {
loadBalancingConfig: [],
methodConfig: []
}
this.resolvingLoadBalancer = new ResolvingLoadBalancer(target, channelControlHelper, defaultServiceConfig);
methodConfig: [],
};
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
target,
channelControlHelper,
defaultServiceConfig
);
this.filterStackFactory = new FilterStackFactory([
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this),
@ -160,50 +186,79 @@ export class ChannelImplementation implements Channel {
* Check the picker output for the given call and corresponding metadata,
* and take any relevant actions. Should not be called while iterating
* over pickQueue.
* @param callStream
* @param callMetadata
* @param callStream
* @param callMetadata
*/
private tryPick(callStream: Http2CallStream, callMetadata: Metadata) {
const pickResult = this.currentPicker.pick({metadata: callMetadata});
switch(pickResult.pickResultType) {
const pickResult = this.currentPicker.pick({ metadata: callMetadata });
switch (pickResult.pickResultType) {
case PickResultType.COMPLETE:
if (pickResult.subchannel === null) {
callStream.cancelWithStatus(Status.UNAVAILABLE, "Request dropped by load balancing policy");
callStream.cancelWithStatus(
Status.UNAVAILABLE,
'Request dropped by load balancing policy'
);
// End the call with an error
} else {
/* If the subchannel disconnects between calling pick and getting
* the filter stack metadata, the call will end with an error. */
callStream.filterStack.sendMetadata(Promise.resolve(new Metadata())).then((finalMetadata) => {
if (pickResult.subchannel!.getConnectivityState() === ConnectivityState.READY) {
pickResult.subchannel!.startCallStream(callMetadata, callStream);
} else {
callStream.cancelWithStatus(Status.UNAVAILABLE, 'Connection dropped while starting call');
}
},
(error: Error & { code: number }) => {
// We assume the error code isn't 0 (Status.OK)
callStream.cancelWithStatus(
error.code || Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${error.message}`
callStream.filterStack
.sendMetadata(Promise.resolve(new Metadata()))
.then(
finalMetadata => {
if (
pickResult.subchannel!.getConnectivityState() ===
ConnectivityState.READY
) {
pickResult.subchannel!.startCallStream(
callMetadata,
callStream
);
} else {
callStream.cancelWithStatus(
Status.UNAVAILABLE,
'Connection dropped while starting call'
);
}
},
(error: Error & { code: number }) => {
// We assume the error code isn't 0 (Status.OK)
callStream.cancelWithStatus(
error.code || Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${
error.message
}`
);
}
);
});
}
break;
case PickResultType.QUEUE:
this.pickQueue.push({callStream, callMetadata});
this.pickQueue.push({ callStream, callMetadata });
break;
case PickResultType.TRANSIENT_FAILURE:
if (callMetadata.getOptions().waitForReady) {
this.pickQueue.push({callStream, callMetadata});
this.pickQueue.push({ callStream, callMetadata });
} else {
callStream.cancelWithStatus(pickResult.status!.code, pickResult.status!.details);
callStream.cancelWithStatus(
pickResult.status!.code,
pickResult.status!.details
);
}
break;
default:
throw new Error(
`Invalid state: unknown pickResultType ${pickResult.pickResultType}`
);
}
}
private removeConnectivityStateWatcher(watcherObject: ConnectivityStateWatcher) {
const watcherIndex = this.connectivityStateWatchers.findIndex((value) => value === watcherObject);
private removeConnectivityStateWatcher(
watcherObject: ConnectivityStateWatcher
) {
const watcherIndex = this.connectivityStateWatchers.findIndex(
value => value === watcherObject
);
if (watcherIndex >= 0) {
this.connectivityStateWatchers.splice(watcherIndex, 1);
}
@ -237,25 +292,31 @@ export class ChannelImplementation implements Channel {
getConnectivityState() {
return this.connectivityState;
}
watchConnectivityState(
currentState: ConnectivityState,
deadline: Date | number,
callback: (error?: Error) => void
): void {
const deadlineDate: Date = deadline instanceof Date ? deadline : new Date(deadline);
const deadlineDate: Date =
deadline instanceof Date ? deadline : new Date(deadline);
const now = new Date();
if (deadlineDate <= now) {
process.nextTick(callback, new Error('Deadline passed without connectivity state change'));
process.nextTick(
callback,
new Error('Deadline passed without connectivity state change')
);
return;
}
const watcherObject = {
currentState,
currentState,
callback,
timer: setTimeout(() => {
this.removeConnectivityStateWatcher(watcherObject);
callback(new Error('Deadline passed without connectivity state change'));
}, deadlineDate.getTime() - now.getTime())
callback(
new Error('Deadline passed without connectivity state change')
);
}, deadlineDate.getTime() - now.getTime()),
};
this.connectivityStateWatchers.push(watcherObject);
}
@ -286,4 +347,4 @@ export class ChannelImplementation implements Channel {
);
return stream;
}
}
}

View File

@ -78,7 +78,11 @@ export class Client {
options
);
} else {
this[CHANNEL_SYMBOL] = new ChannelImplementation(address, credentials, options);
this[CHANNEL_SYMBOL] = new ChannelImplementation(
address,
credentials,
options
);
}
}

View File

@ -138,7 +138,9 @@ class UnknownHandler extends CompressionHandler {
compressMessage(message: Buffer): Promise<Buffer> {
return Promise.reject<Buffer>(
new Error(
`Received message compressed wth unsupported compression method ${this.compressionName}`
`Received message compressed wth unsupported compression method ${
this.compressionName
}`
)
);
}

View File

@ -15,9 +15,20 @@
*
*/
import { LoadBalancer, ChannelControlHelper, registerLoadBalancerType } from './load-balancer';
import {
LoadBalancer,
ChannelControlHelper,
registerLoadBalancerType,
} from './load-balancer';
import { ConnectivityState } from './channel';
import { QueuePicker, Picker, PickArgs, CompletePickResult, PickResultType, UnavailablePicker } from './picker';
import {
QueuePicker,
Picker,
PickArgs,
CompletePickResult,
PickResultType,
UnavailablePicker,
} from './picker';
import { LoadBalancingConfig } from './load-balancing-config';
import { Subchannel, ConnectivityStateListener } from './subchannel';
@ -36,12 +47,12 @@ const CONNECTION_DELAY_INTERVAL_MS = 250;
class PickFirstPicker implements Picker {
constructor(private subchannel: Subchannel) {}
pick(pickArgs: PickArgs) : CompletePickResult {
pick(pickArgs: PickArgs): CompletePickResult {
return {
pickResultType: PickResultType.COMPLETE,
subchannel: this.subchannel,
status: null
}
status: null,
};
}
}
@ -63,12 +74,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* The index within the `subchannels` array of the subchannel with the most
* recently started connection attempt.
*/
private currentSubchannelIndex: number = 0;
private currentSubchannelIndex = 0;
/**
* The number of subchannels in the `subchannels` list currently in the
* CONNECTING state. Used to determine the overall load balancer state.
*/
private subchannelConnectingCount: number = 0;
private subchannelConnectingCount = 0;
/**
* The currently picked subchannel used for making calls. Populated if
* and only if the load balancer's current state is READY. In that case,
@ -85,11 +96,11 @@ export class PickFirstLoadBalancer implements LoadBalancer {
*/
private pickedSubchannelStateListener: ConnectivityStateListener;
/**
* Timer reference for the timer tracking when to start
* Timer reference for the timer tracking when to start
*/
private connectionDelayTimeout: NodeJS.Timeout;
private triedAllSubchannels: boolean = false;
private triedAllSubchannels = false;
/**
* Load balancer that attempts to connect to each backend in the address list
@ -100,7 +111,11 @@ export class PickFirstLoadBalancer implements LoadBalancer {
*/
constructor(private channelControlHelper: ChannelControlHelper) {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.subchannelStateListener = (subchannel: Subchannel, previousState: ConnectivityState, newState: ConnectivityState) => {
this.subchannelStateListener = (
subchannel: Subchannel,
previousState: ConnectivityState,
newState: ConnectivityState
) => {
if (previousState === ConnectivityState.CONNECTING) {
this.subchannelConnectingCount -= 1;
}
@ -112,7 +127,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
return;
} else {
if (this.currentPick === null) {
if (newState === ConnectivityState.TRANSIENT_FAILURE || newState === ConnectivityState.IDLE) {
if (
newState === ConnectivityState.TRANSIENT_FAILURE ||
newState === ConnectivityState.IDLE
) {
process.nextTick(() => {
subchannel.startConnecting();
});
@ -121,11 +139,17 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* to goes into TRANSIENT_FAILURE, immediately try to start
* connecting to the next one instead of waiting for the connection
* delay timer. */
if (subchannel === this.subchannels[this.currentSubchannelIndex] && newState === ConnectivityState.TRANSIENT_FAILURE) {
if (
subchannel === this.subchannels[this.currentSubchannelIndex] &&
newState === ConnectivityState.TRANSIENT_FAILURE
) {
this.startNextSubchannelConnecting();
}
if (this.triedAllSubchannels) {
const newLBState = this.subchannelConnectingCount > 0 ? ConnectivityState.CONNECTING : ConnectivityState.TRANSIENT_FAILURE;
const newLBState =
this.subchannelConnectingCount > 0
? ConnectivityState.CONNECTING
: ConnectivityState.TRANSIENT_FAILURE;
if (newLBState !== this.currentState) {
if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
this.updateState(newLBState, new UnavailablePicker());
@ -134,17 +158,29 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
}
} else {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
this.updateState(
ConnectivityState.CONNECTING,
new QueuePicker(this)
);
}
}
}
};
this.pickedSubchannelStateListener = (subchannel: Subchannel, previousState: ConnectivityState, newState: ConnectivityState) => {
this.pickedSubchannelStateListener = (
subchannel: Subchannel,
previousState: ConnectivityState,
newState: ConnectivityState
) => {
if (newState !== ConnectivityState.READY) {
subchannel.unref();
subchannel.removeConnectivityStateListener(this.pickedSubchannelStateListener);
subchannel.removeConnectivityStateListener(
this.pickedSubchannelStateListener
);
if (this.subchannels.length > 0) {
const newLBState = this.subchannelConnectingCount > 0 ? ConnectivityState.CONNECTING : ConnectivityState.TRANSIENT_FAILURE;
const newLBState =
this.subchannelConnectingCount > 0
? ConnectivityState.CONNECTING
: ConnectivityState.TRANSIENT_FAILURE;
if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
this.updateState(newLBState, new UnavailablePicker());
} else {
@ -160,7 +196,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
clearTimeout(this.connectionDelayTimeout);
}
private startNextSubchannelConnecting() {
if (this.triedAllSubchannels) {
return;
@ -168,7 +203,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
for (const [index, subchannel] of this.subchannels.entries()) {
if (index > this.currentSubchannelIndex) {
const subchannelState = subchannel.getConnectivityState();
if (subchannelState === ConnectivityState.IDLE || subchannelState === ConnectivityState.CONNECTING) {
if (
subchannelState === ConnectivityState.IDLE ||
subchannelState === ConnectivityState.CONNECTING
) {
this.startConnecting(index);
return;
}
@ -184,20 +222,25 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private startConnecting(subchannelIndex: number) {
clearTimeout(this.connectionDelayTimeout);
this.currentSubchannelIndex = subchannelIndex;
if (this.subchannels[subchannelIndex].getConnectivityState() === ConnectivityState.IDLE) {
if (
this.subchannels[subchannelIndex].getConnectivityState() ===
ConnectivityState.IDLE
) {
process.nextTick(() => {
this.subchannels[subchannelIndex].startConnecting();
});
}
this.connectionDelayTimeout = setTimeout(() => {
this.startNextSubchannelConnecting();
}, CONNECTION_DELAY_INTERVAL_MS)
}, CONNECTION_DELAY_INTERVAL_MS);
}
private pickSubchannel(subchannel: Subchannel) {
if (this.currentPick !== null) {
this.currentPick.unref();
this.currentPick.removeConnectivityStateListener(this.pickedSubchannelStateListener);
this.currentPick.removeConnectivityStateListener(
this.pickedSubchannelStateListener
);
}
this.currentPick = subchannel;
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
@ -229,7 +272,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
*/
private connectToAddressList(): void {
this.resetSubchannelList();
this.subchannels = this.latestAddressList.map((address) => this.channelControlHelper.createSubchannel(address, {}));
this.subchannels = this.latestAddressList.map(address =>
this.channelControlHelper.createSubchannel(address, {})
);
for (const subchannel of this.subchannels) {
subchannel.ref();
}
@ -237,24 +282,36 @@ export class PickFirstLoadBalancer implements LoadBalancer {
subchannel.addConnectivityStateListener(this.subchannelStateListener);
if (subchannel.getConnectivityState() === ConnectivityState.READY) {
this.pickSubchannel(subchannel);
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
this.updateState(
ConnectivityState.READY,
new PickFirstPicker(subchannel)
);
this.resetSubchannelList();
return;
}
}
for (const [index, subchannel] of this.subchannels.entries()) {
const subchannelState = subchannel.getConnectivityState();
if (subchannelState === ConnectivityState.IDLE || subchannelState === ConnectivityState.CONNECTING) {
if (
subchannelState === ConnectivityState.IDLE ||
subchannelState === ConnectivityState.CONNECTING
) {
this.startConnecting(index);
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
return;
}
}
// If the code reaches this point, every subchannel must be in TRANSIENT_FAILURE
this.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker());
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker()
);
}
updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null): void {
updateAddressList(
addressList: string[],
lbConfig: LoadBalancingConfig | null
): void {
// lbConfig has no useful information for pick first load balancing
this.latestAddressList = addressList;
this.connectToAddressList();
@ -277,7 +334,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.resetSubchannelList();
if (this.currentPick !== null) {
this.currentPick.unref();
this.currentPick.removeConnectivityStateListener(this.pickedSubchannelStateListener);
this.currentPick.removeConnectivityStateListener(
this.pickedSubchannelStateListener
);
}
}
@ -292,4 +351,4 @@ export class PickFirstLoadBalancer implements LoadBalancer {
export function setup(): void {
registerLoadBalancerType(TYPE_NAME, PickFirstLoadBalancer);
}
}

View File

@ -15,11 +15,11 @@
*
*/
import { ChannelOptions } from "./channel-options";
import { Subchannel } from "./subchannel";
import { ConnectivityState } from "./channel";
import { Picker } from "./picker";
import { LoadBalancingConfig } from "./load-balancing-config";
import { ChannelOptions } from './channel-options';
import { Subchannel } from './subchannel';
import { ConnectivityState } from './channel';
import { Picker } from './picker';
import { LoadBalancingConfig } from './load-balancing-config';
import * as load_balancer_pick_first from './load-balancer-pick-first';
/**
@ -32,7 +32,10 @@ export interface ChannelControlHelper {
* @param subchannelAddress The address to connect to
* @param subchannelArgs Extra channel arguments specified by the load balancer
*/
createSubchannel(subchannelAddress: string, subchannelArgs: ChannelOptions): Subchannel;
createSubchannel(
subchannelAddress: string,
subchannelArgs: ChannelOptions
): Subchannel;
/**
* Passes a new subchannel picker up to the channel. This is called if either
* the connectivity state changes or if a different picker is needed for any
@ -61,7 +64,10 @@ export interface LoadBalancer {
* @param lbConfig The load balancing config object from the service config,
* if one was provided
*/
updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null): void;
updateAddressList(
addressList: string[],
lbConfig: LoadBalancingConfig | null
): void;
/**
* If the load balancer is currently in the IDLE state, start connecting.
*/
@ -91,16 +97,24 @@ export interface LoadBalancer {
}
export interface LoadBalancerConstructor {
new(channelControlHelper: ChannelControlHelper): LoadBalancer;
new (channelControlHelper: ChannelControlHelper): LoadBalancer;
}
const registeredLoadBalancerTypes: {[name: string]: LoadBalancerConstructor} = {};
const registeredLoadBalancerTypes: {
[name: string]: LoadBalancerConstructor;
} = {};
export function registerLoadBalancerType(typeName: string, loadBalancerType: LoadBalancerConstructor) {
export function registerLoadBalancerType(
typeName: string,
loadBalancerType: LoadBalancerConstructor
) {
registeredLoadBalancerTypes[typeName] = loadBalancerType;
}
export function createLoadBalancer(typeName: string, channelControlHelper: ChannelControlHelper): LoadBalancer | null {
export function createLoadBalancer(
typeName: string,
channelControlHelper: ChannelControlHelper
): LoadBalancer | null {
if (typeName in registeredLoadBalancerTypes) {
return new registeredLoadBalancerTypes[typeName](channelControlHelper);
} else {
@ -114,4 +128,4 @@ export function isLoadBalancerNameRegistered(typeName: string): boolean {
export function registerAll() {
load_balancer_pick_first.setup();
}
}

View File

@ -21,10 +21,11 @@
* specific object type if the input has the right structure, and throws an
* error otherwise. */
import { isString, isArray } from "util";
/* The any type is purposely used here. All functions validate their input at
* runtime */
/* tslint:disable:no-any */
export interface RoundRobinConfig {
}
export interface RoundRobinConfig {}
export interface XdsConfig {
balancerName: string;
@ -48,16 +49,16 @@ export interface LoadBalancingConfig {
* effectively */
function validateXdsConfig(xds: any): XdsConfig {
if (!('balancerName' in xds) || !isString(xds.balancerName)) {
if (!('balancerName' in xds) || typeof xds.balancerName !== 'string') {
throw new Error('Invalid xds config: invalid balancerName');
}
const xdsConfig: XdsConfig = {
balancerName: xds.balancerName,
childPolicy: [],
fallbackPolicy: []
fallbackPolicy: [],
};
if ('childPolicy' in xds) {
if (!isArray(xds.childPolicy)) {
if (!Array.isArray(xds.childPolicy)) {
throw new Error('Invalid xds config: invalid childPolicy');
}
for (const policy of xds.childPolicy) {
@ -65,7 +66,7 @@ function validateXdsConfig(xds: any): XdsConfig {
}
}
if ('fallbackPolicy' in xds) {
if (!isArray(xds.fallbackPolicy)) {
if (!Array.isArray(xds.fallbackPolicy)) {
throw new Error('Invalid xds config: invalid fallbackPolicy');
}
for (const policy of xds.fallbackPolicy) {
@ -77,10 +78,10 @@ function validateXdsConfig(xds: any): XdsConfig {
function validateGrpcLbConfig(grpclb: any): GrpcLbConfig {
const grpcLbConfig: GrpcLbConfig = {
childPolicy: []
childPolicy: [],
};
if ('childPolicy' in grpclb) {
if (!isArray(grpclb.childPolicy)) {
if (!Array.isArray(grpclb.childPolicy)) {
throw new Error('Invalid xds config: invalid childPolicy');
}
for (const policy of grpclb.childPolicy) {
@ -96,17 +97,17 @@ export function validateConfig(obj: any): LoadBalancingConfig {
throw new Error('Multiple load balancing policies configured');
}
if (obj['round_robin'] instanceof Object) {
return { round_robin: {} }
return { round_robin: {} };
}
}
if ('xds' in obj) {
if ('grpclb' in obj) {
throw new Error('Multiple load balancing policies configured');
}
return {xds: validateXdsConfig(obj.xds)};
return { xds: validateXdsConfig(obj.xds) };
}
if ('grpclb' in obj) {
return {grpclb: validateGrpcLbConfig(obj.grpclb)};
return { grpclb: validateGrpcLbConfig(obj.grpclb) };
}
throw new Error('No recognized load balancing policy configured');
}
}

View File

@ -15,16 +15,16 @@
*
*/
import { Subchannel } from "./subchannel";
import { StatusObject } from "./call-stream";
import { Metadata } from "./metadata";
import { Status } from "./constants";
import { LoadBalancer } from "./load-balancer";
import { Subchannel } from './subchannel';
import { StatusObject } from './call-stream';
import { Metadata } from './metadata';
import { Status } from './constants';
import { LoadBalancer } from './load-balancer';
export enum PickResultType {
COMPLETE,
QUEUE,
TRANSIENT_FAILURE
TRANSIENT_FAILURE,
}
export interface PickResult {
@ -85,8 +85,8 @@ export class UnavailablePicker implements Picker {
} else {
this.status = {
code: Status.UNAVAILABLE,
details: "No connection established",
metadata: new Metadata()
details: 'No connection established',
metadata: new Metadata(),
};
}
}
@ -94,7 +94,7 @@ export class UnavailablePicker implements Picker {
return {
pickResultType: PickResultType.TRANSIENT_FAILURE,
subchannel: null,
status: this.status
status: this.status,
};
}
}
@ -107,7 +107,7 @@ export class UnavailablePicker implements Picker {
* once any pick is attempted.
*/
export class QueuePicker {
private calledExitIdle: boolean = false;
private calledExitIdle = false;
// Constructed with a load balancer. Calls exitIdle on it the first time pick is called
constructor(private loadBalancer: LoadBalancer) {}
@ -119,7 +119,7 @@ export class QueuePicker {
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null
}
status: null,
};
}
}
}

View File

@ -14,7 +14,12 @@
* limitations under the License.
*/
import { Resolver, ResolverListener, registerResolver, registerDefaultResolver } from './resolver';
import {
Resolver,
ResolverListener,
registerResolver,
registerDefaultResolver,
} from './resolver';
import * as dns from 'dns';
import * as util from 'util';
import { extractAndSelectServiceConfig, ServiceConfig } from './service-config';
@ -30,17 +35,17 @@ import { Metadata } from './metadata';
* Matches 4 groups of up to 3 digits each, separated by periods, optionally
* followed by a colon and a number.
*/
const IPv4_REGEX = /^(\d{1,3}(?:\.\d{1,3}){3})(?::(\d+))?$/;
const IPV4_REGEX = /^(\d{1,3}(?:\.\d{1,3}){3})(?::(\d+))?$/;
/**
* Matches any number of groups of up to 4 hex digits (case insensitive)
* separated by 1 or more colons. This variant does not match a port number.
*/
const IPv6_REGEX = /^([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)$/i;
const IPV6_REGEX = /^([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)$/i;
/**
* Matches the same as the IPv6_REGEX, surrounded by square brackets, and
* optionally followed by a colon and a number.
*/
const IPv6_BRACKET_REGEX = /^\[([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)\](?::(\d+))?$/i;
const IPV6_BRACKET_REGEX = /^\[([0-9a-f]{0,4}(?::{1,2}[0-9a-f]{0,4})+)\](?::(\d+))?$/i;
/**
* Matches `[dns:][//authority/]host[:port]`, where `authority` and `host` are
@ -60,13 +65,16 @@ const resolve6Promise = util.promisify(dns.resolve6);
/**
* Attempt to parse a target string as an IP address
* @param target
* @param target
* @return An "IP:port" string if parsing was successful, `null` otherwise
*/
function parseIP(target: string): string | null {
/* These three regular expressions are all mutually exclusive, so we just
* want the first one that matches the target string, if any do. */
const match = IPv4_REGEX.exec(target) || IPv6_REGEX.exec(target) || IPv6_BRACKET_REGEX.exec(target);
const match =
IPV4_REGEX.exec(target) ||
IPV6_REGEX.exec(target) ||
IPV6_BRACKET_REGEX.exec(target);
if (match === null) {
return null;
}
@ -82,13 +90,17 @@ function parseIP(target: string): string | null {
/**
* Merge any number of arrays into a single alternating array
* @param arrays
* @param arrays
*/
function mergeArrays<T>(...arrays: T[][]): T[] {
const result: T[] = [];
for(let i = 0; i<Math.max.apply(null, arrays.map((array)=> array.length)); i++) {
for(let array of arrays) {
if(i < array.length) {
for (
let i = 0;
i < Math.max.apply(null, arrays.map(array => array.length));
i++
) {
for (const array of arrays) {
if (i < array.length) {
result.push(array[i]);
}
}
@ -105,7 +117,9 @@ class DnsResolver implements Resolver {
private readonly port: string | null;
/* The promise results here contain, in order, the A record, the AAAA record,
* and either the TXT record or an error if TXT resolution failed */
pendingResultPromise: Promise<[string[], string[], string[][] | Error]> | null = null;
pendingResultPromise: Promise<
[string[], string[], string[][] | Error]
> | null = null;
percentage: number;
constructor(private target: string, private listener: ResolverListener) {
this.ipResult = parseIP(target);
@ -126,7 +140,7 @@ class DnsResolver implements Resolver {
/**
* If the target is an IP address, just provide that address as a result.
* Otherwise, initiate A, AAAA, and TXT
* Otherwise, initiate A, AAAA, and TXT
*/
private startResolution() {
if (this.ipResult !== null) {
@ -137,12 +151,12 @@ class DnsResolver implements Resolver {
}
if (this.dnsHostname !== null) {
const hostname: string = this.dnsHostname;
const Aresult = resolve4Promise(hostname);
const AAAAresult = resolve6Promise(hostname);
const aResult = resolve4Promise(hostname);
const aaaaResult = resolve6Promise(hostname);
/* We handle the TXT query promise differently than the others because
* the name resolution attempt as a whole is a success even if the TXT
* lookup fails */
const TXTresult = new Promise<string[][] | Error>((resolve, reject) => {
const txtResult = new Promise<string[][] | Error>((resolve, reject) => {
dns.resolveTxt(hostname, (err, records) => {
if (err) {
resolve(err);
@ -151,41 +165,51 @@ class DnsResolver implements Resolver {
}
});
});
this.pendingResultPromise = Promise.all([Aresult, AAAAresult, TXTresult]);
this.pendingResultPromise.then(([Arecord, AAAArecord, TXTrecord]) => {
this.pendingResultPromise = null;
Arecord = Arecord.map((value) => `${value}:${this.port}`);
AAAArecord = AAAArecord.map((value) => `[${value}]:${this.port}`);
const allAddresses: string[] = mergeArrays(AAAArecord, Arecord);
let serviceConfig: ServiceConfig | null = null;
let serviceConfigError: StatusObject | null = null;
if (TXTrecord instanceof Error) {
serviceConfigError = {
code: Status.UNAVAILABLE,
details: 'TXT query failed',
metadata: new Metadata()
};
} else {
try {
serviceConfig = extractAndSelectServiceConfig(TXTrecord, this.percentage);
} catch (err) {
this.pendingResultPromise = Promise.all([aResult, aaaaResult, txtResult]);
this.pendingResultPromise.then(
([aRecord, aaaaRecord, txtRecord]) => {
this.pendingResultPromise = null;
aRecord = aRecord.map(value => `${value}:${this.port}`);
aaaaRecord = aaaaRecord.map(value => `[${value}]:${this.port}`);
const allAddresses: string[] = mergeArrays(aaaaRecord, aRecord);
let serviceConfig: ServiceConfig | null = null;
let serviceConfigError: StatusObject | null = null;
if (txtRecord instanceof Error) {
serviceConfigError = {
code: Status.UNAVAILABLE,
details: 'Parsing service config failed',
metadata: new Metadata()
details: 'TXT query failed',
metadata: new Metadata(),
};
} else {
try {
serviceConfig = extractAndSelectServiceConfig(
txtRecord,
this.percentage
);
} catch (err) {
serviceConfigError = {
code: Status.UNAVAILABLE,
details: 'Parsing service config failed',
metadata: new Metadata(),
};
}
}
this.listener.onSuccessfulResolution(
allAddresses,
serviceConfig,
serviceConfigError
);
},
err => {
this.pendingResultPromise = null;
this.listener.onError({
code: Status.UNAVAILABLE,
details: 'Name resolution failed',
metadata: new Metadata(),
});
this.listener.onError(err);
}
this.listener.onSuccessfulResolution(allAddresses, serviceConfig, serviceConfigError);
}, (err) => {
this.pendingResultPromise = null;
this.listener.onError({
code: Status.UNAVAILABLE,
details: 'Name resolution failed',
metadata: new Metadata()
});
this.listener.onError(err);
});
);
}
}
@ -198,10 +222,13 @@ class DnsResolver implements Resolver {
/**
* Get the default authority for the given target. For IP targets, that is
* the IP address. For DNS targets, it is the hostname.
* @param target
* @param target
*/
static getDefaultAuthority(target: string): string {
const ipMatch = IPv4_REGEX.exec(target) || IPv6_REGEX.exec(target) || IPv6_BRACKET_REGEX.exec(target);
const ipMatch =
IPV4_REGEX.exec(target) ||
IPV6_REGEX.exec(target) ||
IPV6_BRACKET_REGEX.exec(target);
if (ipMatch) {
return ipMatch[1];
}
@ -220,4 +247,4 @@ class DnsResolver implements Resolver {
export function setup(): void {
registerResolver('dns:', DnsResolver);
registerDefaultResolver(DnsResolver);
}
}

View File

@ -15,10 +15,10 @@
*
*/
import { ServiceError } from "./call";
import { ServiceConfig } from "./service-config";
import { ServiceError } from './call';
import { ServiceConfig } from './service-config';
import * as resolver_dns from './resolver-dns';
import { StatusObject } from "./call-stream";
import { StatusObject } from './call-stream';
/**
* A listener object passed to the resolver's constructor that provides name
@ -34,7 +34,11 @@ export interface ResolverListener {
* @param serviceConfigError If non-`null`, indicates that the retrieved
* service configuration was invalid
*/
onSuccessfulResolution(addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null): void;
onSuccessfulResolution(
addressList: string[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
): void;
/**
* Called whenever a name resolution attempt fails.
* @param error Describes how resolution failed
@ -56,36 +60,38 @@ export interface Resolver {
updateResolution(): void;
}
export interface ResolverConstructor {
new(target: string, listener: ResolverListener): Resolver;
new (target: string, listener: ResolverListener): Resolver;
/**
* Get the default authority for a target. This loosely corresponds to that
* target's hostname. Throws an error if this resolver class cannot parse the
* `target`.
* @param target
*/
getDefaultAuthority(target:string): string;
getDefaultAuthority(target: string): string;
}
const registeredResolvers: {[prefix: string]: ResolverConstructor} = {};
const registeredResolvers: { [prefix: string]: ResolverConstructor } = {};
let defaultResolver: ResolverConstructor | null = null;
/**
* Register a resolver class to handle target names prefixed with the `prefix`
* string. This prefix should correspond to a URI scheme name listed in the
* [gRPC Name Resolution document](https://github.com/grpc/grpc/blob/master/doc/naming.md)
* @param prefix
* @param resolverClass
* @param prefix
* @param resolverClass
*/
export function registerResolver(prefix: string, resolverClass: ResolverConstructor) {
export function registerResolver(
prefix: string,
resolverClass: ResolverConstructor
) {
registeredResolvers[prefix] = resolverClass;
}
/**
* Register a default resolver to handle target names that do not start with
* any registered prefix.
* @param resolverClass
* @param resolverClass
*/
export function registerDefaultResolver(resolverClass: ResolverConstructor) {
defaultResolver = resolverClass;
@ -94,10 +100,13 @@ export function registerDefaultResolver(resolverClass: ResolverConstructor) {
/**
* Create a name resolver for the specified target, if possible. Throws an
* error if no such name resolver can be created.
* @param target
* @param listener
* @param target
* @param listener
*/
export function createResolver(target: string, listener: ResolverListener): Resolver {
export function createResolver(
target: string,
listener: ResolverListener
): Resolver {
for (const prefix of Object.keys(registeredResolvers)) {
if (target.startsWith(prefix)) {
return new registeredResolvers[prefix](target, listener);
@ -112,7 +121,7 @@ export function createResolver(target: string, listener: ResolverListener): Reso
/**
* Get the default authority for the specified target, if possible. Throws an
* error if no registered name resolver can parse that target string.
* @param target
* @param target
*/
export function getDefaultAuthority(target: string): string {
for (const prefix of Object.keys(registerDefaultResolver)) {
@ -128,4 +137,4 @@ export function getDefaultAuthority(target: string): string {
export function registerAll() {
resolver_dns.setup();
}
}

View File

@ -15,18 +15,23 @@
*
*/
import { ChannelControlHelper, LoadBalancer, isLoadBalancerNameRegistered, createLoadBalancer } from "./load-balancer";
import { ServiceConfig } from "./service-config";
import { ConnectivityState } from "./channel";
import { createResolver, Resolver } from "./resolver";
import { ServiceError } from "./call";
import { ChannelOptions } from "./channel-options";
import { Picker, UnavailablePicker, QueuePicker } from "./picker";
import { LoadBalancingConfig } from "./load-balancing-config";
import { BackoffTimeout } from "./backoff-timeout";
import { Status } from "./constants";
import { StatusObject } from "./call-stream";
import { Metadata } from "./metadata";
import {
ChannelControlHelper,
LoadBalancer,
isLoadBalancerNameRegistered,
createLoadBalancer,
} from './load-balancer';
import { ServiceConfig } from './service-config';
import { ConnectivityState } from './channel';
import { createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
import { ChannelOptions } from './channel-options';
import { Picker, UnavailablePicker, QueuePicker } from './picker';
import { LoadBalancingConfig } from './load-balancing-config';
import { BackoffTimeout } from './backoff-timeout';
import { Status } from './constants';
import { StatusObject } from './call-stream';
import { Metadata } from './metadata';
const DEFAULT_LOAD_BALANCER_NAME = 'pick_first';
@ -103,10 +108,18 @@ export class ResolvingLoadBalancer implements LoadBalancer {
* In practice, that means using the "pick first" load balancer
* implmentation
*/
constructor (private target: string, private channelControlHelper: ChannelControlHelper, private defaultServiceConfig: ServiceConfig | null) {
constructor(
private target: string,
private channelControlHelper: ChannelControlHelper,
private defaultServiceConfig: ServiceConfig | null
) {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.innerResolver = createResolver(target, {
onSuccessfulResolution: (addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null) => {
onSuccessfulResolution: (
addressList: string[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null
) => {
let workingServiceConfig: ServiceConfig | null = null;
/* This first group of conditionals implements the algorithm described
* in https://github.com/grpc/proposal/blob/master/A21-service-config-error-handling.md
@ -127,7 +140,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
this.handleResolutionFailure(serviceConfigError);
} else {
// Step 4.ii.a
workingServiceConfig = this.defaultServiceConfig
workingServiceConfig = this.defaultServiceConfig;
}
} else {
// Step 4.i
@ -141,7 +154,10 @@ export class ResolvingLoadBalancer implements LoadBalancer {
}
let loadBalancerName: string | null = null;
let loadBalancingConfig: LoadBalancingConfig | null = null;
if (workingServiceConfig === null || workingServiceConfig.loadBalancingConfig.length === 0) {
if (
workingServiceConfig === null ||
workingServiceConfig.loadBalancingConfig.length === 0
) {
loadBalancerName = DEFAULT_LOAD_BALANCER_NAME;
} else {
for (const lbConfig of workingServiceConfig.loadBalancingConfig) {
@ -163,39 +179,68 @@ export class ResolvingLoadBalancer implements LoadBalancer {
// There were load balancing configs but none are supported. This counts as a resolution failure
this.handleResolutionFailure({
code: Status.UNAVAILABLE,
details: 'All load balancer options in service config are not compatible',
metadata: new Metadata()
details:
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
return;
}
}
if (this.innerLoadBalancer === null) {
this.innerLoadBalancer = createLoadBalancer(loadBalancerName, this.innerChannelControlHelper)!;
this.innerLoadBalancer.updateAddressList(addressList, loadBalancingConfig);
this.innerLoadBalancer = createLoadBalancer(
loadBalancerName,
this.innerChannelControlHelper
)!;
this.innerLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig
);
} else if (this.innerLoadBalancer.getTypeName() === loadBalancerName) {
this.innerLoadBalancer.updateAddressList(addressList, loadBalancingConfig);
this.innerLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig
);
} else {
if (this.pendingReplacementLoadBalancer === null || this.pendingReplacementLoadBalancer.getTypeName() !== loadBalancerName) {
if (
this.pendingReplacementLoadBalancer === null ||
this.pendingReplacementLoadBalancer.getTypeName() !==
loadBalancerName
) {
if (this.pendingReplacementLoadBalancer !== null) {
this.pendingReplacementLoadBalancer.destroy();
}
this.pendingReplacementLoadBalancer = createLoadBalancer(loadBalancerName, this.replacementChannelControlHelper)!;
this.pendingReplacementLoadBalancer = createLoadBalancer(
loadBalancerName,
this.replacementChannelControlHelper
)!;
}
this.pendingReplacementLoadBalancer.updateAddressList(addressList, loadBalancingConfig);
this.pendingReplacementLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig
);
}
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);
}
},
});
this.innerChannelControlHelper = {
createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => {
return this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
createSubchannel: (
subchannelAddress: string,
subchannelArgs: ChannelOptions
) => {
return this.channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs
);
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.innerBalancerState = connectivityState;
if (connectivityState !== ConnectivityState.READY && this.pendingReplacementLoadBalancer !== null) {
if (
connectivityState !== ConnectivityState.READY &&
this.pendingReplacementLoadBalancer !== null
) {
this.switchOverReplacementBalancer();
} else {
this.updateState(connectivityState, picker);
@ -211,12 +256,18 @@ export class ResolvingLoadBalancer implements LoadBalancer {
this.innerResolver.updateResolution();
}
}
}
}
},
};
this.replacementChannelControlHelper = {
createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => {
return this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
createSubchannel: (
subchannelAddress: string,
subchannelArgs: ChannelOptions
) => {
return this.channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs
);
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.replacementBalancerState = connectivityState;
@ -233,7 +284,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
* updateResolution */
this.innerResolver.updateResolution();
}
}
},
};
this.backoffTimeout = new BackoffTimeout(() => {
@ -258,15 +309,23 @@ export class ResolvingLoadBalancer implements LoadBalancer {
private switchOverReplacementBalancer() {
this.innerLoadBalancer!.destroy();
this.innerLoadBalancer = this.pendingReplacementLoadBalancer!;
this.innerLoadBalancer.replaceChannelControlHelper(this.innerChannelControlHelper);
this.innerLoadBalancer.replaceChannelControlHelper(
this.innerChannelControlHelper
);
this.pendingReplacementLoadBalancer = null;
this.innerBalancerState = this.replacementBalancerState;
this.updateState(this.replacementBalancerState, this.replacementBalancerPicker);
this.updateState(
this.replacementBalancerState,
this.replacementBalancerPicker
);
}
private handleResolutionFailure(error: StatusObject) {
if (this.innerLoadBalancer === null) {
this.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker(error));
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker(error)
);
}
this.backoffTimeout.runOnce();
}
@ -276,10 +335,16 @@ export class ResolvingLoadBalancer implements LoadBalancer {
if (this.innerLoadBalancer !== null) {
this.innerLoadBalancer.exitIdle();
}
this.channelControlHelper.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
this.channelControlHelper.updateState(
ConnectivityState.CONNECTING,
new QueuePicker(this)
);
}
updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null) {
updateAddressList(
addressList: string[],
lbConfig: LoadBalancingConfig | null
) {
throw new Error('updateAddressList not supported on ResolvingLoadBalancer');
}
@ -312,4 +377,4 @@ export class ResolvingLoadBalancer implements LoadBalancer {
replaceChannelControlHelper(channelControlHelper: ChannelControlHelper) {
this.channelControlHelper = channelControlHelper;
}
}
}

View File

@ -22,8 +22,11 @@
* specific object type if the input has the right structure, and throws an
* error otherwise. */
/* The any type is purposely used here. All functions validate their input at
* runtime */
/* tslint:disable:no-any */
import * as lbconfig from './load-balancing-config';
import { isString, isArray, isBoolean, isNumber } from 'util';
import * as os from 'os';
export interface MethodConfigName {
@ -41,11 +44,10 @@ export interface MethodConfig {
export interface ServiceConfig {
loadBalancingPolicy?: string;
loadBalancingConfig: lbconfig.LoadBalancingConfig[]
loadBalancingConfig: lbconfig.LoadBalancingConfig[];
methodConfig: MethodConfig[];
}
export interface ServiceConfigCanaryConfig {
clientLanguage?: string[];
percentage?: number;
@ -66,14 +68,14 @@ const TIMEOUT_REGEX = /^\d+(\.\d{1,9})?s$/;
const CLIENT_LANGUAGE_STRING = 'node';
function validateName(obj: any): MethodConfigName {
if (!('service' in obj) || !isString(obj.service)) {
if (!('service' in obj) || typeof obj.service !== 'string') {
throw new Error('Invalid method config name: invalid service');
}
const result: MethodConfigName = {
service: obj.service
service: obj.service,
};
if ('method' in obj) {
if (isString(obj.method)) {
if (typeof obj.method === 'string') {
result.method = obj.method;
} else {
throw new Error('Invalid method config name: invalid method');
@ -84,34 +86,37 @@ function validateName(obj: any): MethodConfigName {
function validateMethodConfig(obj: any): MethodConfig {
const result: MethodConfig = {
name: []
name: [],
};
if (!('name' in obj) || !isArray(obj.name)) {
if (!('name' in obj) || !Array.isArray(obj.name)) {
throw new Error('Invalid method config: invalid name array');
}
for (const name of obj.name) {
result.name.push(validateName(name));
}
if ('waitForReady' in obj) {
if (!isBoolean(obj.waitForReady)) {
if (typeof obj.waitForReady !== 'boolean') {
throw new Error('Invalid method config: invalid waitForReady');
}
result.waitForReady = obj.waitForReady;
}
if ('timeout' in obj) {
if (!isString(obj.timeout) || !TIMEOUT_REGEX.test(obj.timeout)) {
if (
!(typeof obj.timeout === 'string') ||
!TIMEOUT_REGEX.test(obj.timeout)
) {
throw new Error('Invalid method config: invalid timeout');
}
result.timeout = obj.timeout;
}
if ('maxRequestBytes' in obj) {
if (!isNumber(obj.maxRequestBytes)) {
if (typeof obj.maxRequestBytes !== 'number') {
throw new Error('Invalid method config: invalid maxRequestBytes');
}
result.maxRequestBytes = obj.maxRequestBytes;
}
if ('maxResponseBytes' in obj) {
if (!isNumber(obj.maxResponseBytes)) {
if (typeof obj.maxResponseBytes !== 'number') {
throw new Error('Invalid method config: invalid maxRequestBytes');
}
result.maxResponseBytes = obj.maxResponseBytes;
@ -122,17 +127,17 @@ function validateMethodConfig(obj: any): MethodConfig {
function validateServiceConfig(obj: any): ServiceConfig {
const result: ServiceConfig = {
loadBalancingConfig: [],
methodConfig: []
methodConfig: [],
};
if ('loadBalancingPolicy' in obj) {
if (isString(obj.loadBalancingPolicy)) {
if (typeof obj.loadBalancingPolicy === 'string') {
result.loadBalancingPolicy = obj.loadBalancingPolicy;
} else {
throw new Error('Invalid service config: invalid loadBalancingPolicy');
}
}
if ('loadBalancingConfig' in obj) {
if (isArray(obj.loadBalancingConfig)) {
if (Array.isArray(obj.loadBalancingConfig)) {
for (const config of obj.loadBalancingConfig) {
result.loadBalancingConfig.push(lbconfig.validateConfig(config));
}
@ -141,7 +146,7 @@ function validateServiceConfig(obj: any): ServiceConfig {
}
}
if ('methodConfig' in obj) {
if (isArray(obj.methodConfig)) {
if (Array.isArray(obj.methodConfig)) {
for (const methodConfig of obj.methodConfig) {
result.methodConfig.push(validateMethodConfig(methodConfig));
}
@ -152,8 +157,15 @@ function validateServiceConfig(obj: any): ServiceConfig {
for (const methodConfig of result.methodConfig) {
for (const name of methodConfig.name) {
for (const seenName of seenMethodNames) {
if (name.service === seenName.service && name.method === seenName.method) {
throw new Error(`Invalid service config: duplicate name ${name.service}/${name.method}`);
if (
name.service === seenName.service &&
name.method === seenName.method
) {
throw new Error(
`Invalid service config: duplicate name ${name.service}/${
name.method
}`
);
}
}
seenMethodNames.push(name);
@ -167,16 +179,18 @@ function validateCanaryConfig(obj: any): ServiceConfigCanaryConfig {
throw new Error('Invalid service config choice: missing service config');
}
const result: ServiceConfigCanaryConfig = {
serviceConfig: validateServiceConfig(obj.serviceConfig)
}
serviceConfig: validateServiceConfig(obj.serviceConfig),
};
if ('clientLanguage' in obj) {
if (isArray(obj.clientLanguage)) {
if (Array.isArray(obj.clientLanguage)) {
result.clientLanguage = [];
for (const lang of obj.clientLanguage) {
if (isString(lang)) {
if (typeof lang === 'string') {
result.clientLanguage.push(lang);
} else {
throw new Error('Invalid service config choice: invalid clientLanguage');
throw new Error(
'Invalid service config choice: invalid clientLanguage'
);
}
}
} else {
@ -184,13 +198,15 @@ function validateCanaryConfig(obj: any): ServiceConfigCanaryConfig {
}
}
if ('clientHostname' in obj) {
if (isArray(obj.clientHostname)) {
if (Array.isArray(obj.clientHostname)) {
result.clientHostname = [];
for (const lang of obj.clientHostname) {
if (isString(lang)) {
if (typeof lang === 'string') {
result.clientHostname.push(lang);
} else {
throw new Error('Invalid service config choice: invalid clientHostname');
throw new Error(
'Invalid service config choice: invalid clientHostname'
);
}
}
} else {
@ -198,34 +214,51 @@ function validateCanaryConfig(obj: any): ServiceConfigCanaryConfig {
}
}
if ('percentage' in obj) {
if (isNumber(obj.percentage) && 0 <= obj.percentage && obj.percentage <= 100) {
if (
typeof obj.percentage === 'number' &&
0 <= obj.percentage &&
obj.percentage <= 100
) {
result.percentage = obj.percentage;
} else {
throw new Error('Invalid service config choice: invalid percentage');
}
}
// Validate that no unexpected fields are present
const allowedFields = ['clientLanguage', 'percentage', 'clientHostname', 'serviceConfig'];
const allowedFields = [
'clientLanguage',
'percentage',
'clientHostname',
'serviceConfig',
];
for (const field in obj) {
if (!allowedFields.includes(field)) {
throw new Error(`Invalid service config choice: unexpected field ${field}`);
throw new Error(
`Invalid service config choice: unexpected field ${field}`
);
}
}
return result;
}
function validateAndSelectCanaryConfig(obj: any, percentage: number): ServiceConfig {
if (!isArray(obj)) {
function validateAndSelectCanaryConfig(
obj: any,
percentage: number
): ServiceConfig {
if (!Array.isArray(obj)) {
throw new Error('Invalid service config list');
}
for (const config of obj) {
const validatedConfig = validateCanaryConfig(config);
/* For each field, we check if it is present, then only discard the
* config if the field value does not match the current client */
if (isNumber(validatedConfig.percentage) && percentage > validatedConfig.percentage) {
if (
typeof validatedConfig.percentage === 'number' &&
percentage > validatedConfig.percentage
) {
continue;
}
if (isArray(validatedConfig.clientHostname)) {
if (Array.isArray(validatedConfig.clientHostname)) {
let hostnameMatched = false;
for (const hostname of validatedConfig.clientHostname) {
if (hostname === os.hostname()) {
@ -236,7 +269,7 @@ function validateAndSelectCanaryConfig(obj: any, percentage: number): ServiceCon
continue;
}
}
if (isArray(validatedConfig.clientLanguage)) {
if (Array.isArray(validatedConfig.clientLanguage)) {
let languageMatched = false;
for (const language of validatedConfig.clientLanguage) {
if (language === CLIENT_LANGUAGE_STRING) {
@ -261,7 +294,10 @@ function validateAndSelectCanaryConfig(obj: any, percentage: number): ServiceCon
* @return The service configuration to use, given the percentage value, or null if the service config
* data has a valid format but none of the options match the current client.
*/
export function extractAndSelectServiceConfig(txtRecord: string[][], percentage: number): ServiceConfig | null {
export function extractAndSelectServiceConfig(
txtRecord: string[][],
percentage: number
): ServiceConfig | null {
for (const record of txtRecord) {
if (record.length > 0 && record[0].startsWith('grpc_config=')) {
/* Treat the list of strings in this record as a single string and remove
@ -272,4 +308,4 @@ export function extractAndSelectServiceConfig(txtRecord: string[][], percentage:
}
}
return null;
}
}

View File

@ -15,9 +15,9 @@
*
*/
import { ChannelOptions, channelOptionsEqual } from "./channel-options";
import { Subchannel } from "./subchannel";
import { ChannelCredentials } from "./channel-credentials";
import { ChannelOptions, channelOptionsEqual } from './channel-options';
import { Subchannel } from './subchannel';
import { ChannelCredentials } from './channel-credentials';
// 10 seconds in milliseconds. This value is arbitrary.
/**
@ -27,7 +27,15 @@ import { ChannelCredentials } from "./channel-credentials";
const REF_CHECK_INTERVAL = 10_000;
export class SubchannelPool {
private pool: {[channelTarget: string]: {[subchannelTarget: string]: {channelArguments: ChannelOptions, channelCredentials: ChannelCredentials, subchannel: Subchannel}[]}} = Object.create(null);
private pool: {
[channelTarget: string]: {
[subchannelTarget: string]: Array<{
channelArguments: ChannelOptions;
channelCredentials: ChannelCredentials;
subchannel: Subchannel;
}>;
};
} = Object.create(null);
/**
* A pool of subchannels use for making connections. Subchannels with the
@ -38,13 +46,24 @@ export class SubchannelPool {
constructor(private global: boolean) {
if (global) {
setInterval(() => {
/* These objects are created with Object.create(null), so they do not
* have a prototype, which means that for (... in ...) loops over them
* do not need to be filtered */
// tslint:disable-next-line:forin
for (const channelTarget in this.pool) {
// tslint:disable-next-line:forin
for (const subchannelTarget in this.pool[channelTarget]) {
const subchannelObjArray = this.pool[channelTarget][subchannelTarget];
const subchannelObjArray = this.pool[channelTarget][
subchannelTarget
];
/* For each subchannel in the pool, try to unref it if it has
* exactly one ref (which is the ref from the pool itself). If that
* does happen, remove the subchannel from the pool */
this.pool[channelTarget][subchannelTarget] = subchannelObjArray.filter((value) => !value.subchannel.unrefIfOneRef());
this.pool[channelTarget][
subchannelTarget
] = subchannelObjArray.filter(
value => !value.subchannel.unrefIfOneRef()
);
}
}
/* Currently we do not delete keys with empty values. If that results
@ -57,31 +76,51 @@ export class SubchannelPool {
/**
* Get a subchannel if one already exists with exactly matching parameters.
* Otherwise, create and save a subchannel with those parameters.
* @param channelTarget
* @param subchannelTarget
* @param channelArguments
* @param channelCredentials
* @param channelTarget
* @param subchannelTarget
* @param channelArguments
* @param channelCredentials
*/
getOrCreateSubchannel(channelTarget: string, subchannelTarget: string, channelArguments: ChannelOptions, channelCredentials: ChannelCredentials): Subchannel {
getOrCreateSubchannel(
channelTarget: string,
subchannelTarget: string,
channelArguments: ChannelOptions,
channelCredentials: ChannelCredentials
): Subchannel {
if (channelTarget in this.pool) {
if (subchannelTarget in this.pool[channelTarget]){
if (subchannelTarget in this.pool[channelTarget]) {
const subchannelObjArray = this.pool[channelTarget][subchannelTarget];
for (const subchannelObj of subchannelObjArray) {
if (channelOptionsEqual(channelArguments, subchannelObj.channelArguments) && channelCredentials._equals(subchannelObj.channelCredentials)) {
if (
channelOptionsEqual(
channelArguments,
subchannelObj.channelArguments
) &&
channelCredentials._equals(subchannelObj.channelCredentials)
) {
return subchannelObj.subchannel;
}
}
}
}
// If we get here, no matching subchannel was found
const subchannel = new Subchannel(channelTarget, subchannelTarget, channelArguments, channelCredentials);
const subchannel = new Subchannel(
channelTarget,
subchannelTarget,
channelArguments,
channelCredentials
);
if (!(channelTarget in this.pool)) {
this.pool[channelTarget] = Object.create(null);
}
if (!(subchannelTarget in this.pool[channelTarget])) {
this.pool[channelTarget][subchannelTarget] = [];
}
this.pool[channelTarget][subchannelTarget].push({channelArguments, channelCredentials, subchannel});
this.pool[channelTarget][subchannelTarget].push({
channelArguments,
channelCredentials,
subchannel,
});
if (this.global) {
subchannel.ref();
}
@ -93,7 +132,7 @@ const globalSubchannelPool = new SubchannelPool(true);
/**
* Get either the global subchannel pool, or a new subchannel pool.
* @param global
* @param global
*/
export function getSubchannelPool(global: boolean): SubchannelPool {
if (global) {
@ -101,4 +140,4 @@ export function getSubchannelPool(global: boolean): SubchannelPool {
} else {
return new SubchannelPool(false);
}
}
}

View File

@ -38,7 +38,11 @@ const BACKOFF_JITTER = 0.2;
const KEEPALIVE_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000;
export type ConnectivityStateListener = (subchannel: Subchannel, previousState: ConnectivityState, newState: ConnectivityState) => void;
export type ConnectivityStateListener = (
subchannel: Subchannel,
previousState: ConnectivityState,
newState: ConnectivityState
) => void;
const {
HTTP2_HEADER_AUTHORITY,
@ -51,8 +55,8 @@ const {
/**
* Get a number uniformly at random in the range [min, max)
* @param min
* @param max
* @param min
* @param max
*/
function uniformRandom(min: number, max: number) {
return Math.random() * (max - min) + min;
@ -72,7 +76,7 @@ export class Subchannel {
* Indicates that the subchannel should transition from TRANSIENT_FAILURE to
* CONNECTING instead of IDLE when the backoff timeout ends.
*/
private continueConnecting: boolean = false;
private continueConnecting = false;
/**
* A list of listener functions that will be called whenever the connectivity
* state changes. Will be modified by `addConnectivityStateListener` and
@ -107,11 +111,11 @@ export class Subchannel {
/**
* Tracks calls with references to this subchannel
*/
private callRefcount: number = 0;
private callRefcount = 0;
/**
* Tracks channels and subchannel pools with references to this subchannel
*/
private refcount: number = 0;
private refcount = 0;
/**
* A class representing a connection to a single backend.
@ -123,40 +127,45 @@ export class Subchannel {
* @param credentials The channel credentials used to establish this
* connection
*/
constructor(private channelTarget: string,
constructor(
private channelTarget: string,
private subchannelAddress: string,
private options: ChannelOptions,
private credentials: ChannelCredentials) {
// Build user-agent string.
this.userAgent = [
options['grpc.primary_user_agent'],
`grpc-node-js/${clientVersion}`,
options['grpc.secondary_user_agent'],
]
.filter(e => e)
.join(' '); // remove falsey values first
private credentials: ChannelCredentials
) {
// Build user-agent string.
this.userAgent = [
options['grpc.primary_user_agent'],
`grpc-node-js/${clientVersion}`,
options['grpc.secondary_user_agent'],
]
.filter(e => e)
.join(' '); // remove falsey values first
if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
}
if ('grpc.keepalive_timeout_ms' in options) {
this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
}
this.keepaliveIntervalId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveIntervalId);
this.keepaliveTimeoutId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveTimeoutId);
this.backoffTimeout = new BackoffTimeout(() => {
if (this.continueConnecting) {
this.transitionToState([ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING],
ConnectivityState.CONNECTING);
} else {
this.transitionToState([ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING],
ConnectivityState.IDLE);
}
});
if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
}
if ('grpc.keepalive_timeout_ms' in options) {
this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
}
this.keepaliveIntervalId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveIntervalId);
this.keepaliveTimeoutId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveTimeoutId);
this.backoffTimeout = new BackoffTimeout(() => {
if (this.continueConnecting) {
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING],
ConnectivityState.CONNECTING
);
} else {
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING],
ConnectivityState.IDLE
);
}
});
}
/**
* Start a backoff timer with the current nextBackoff timeout
@ -195,7 +204,7 @@ export class Subchannel {
private startConnectingInternal() {
const connectionOptions: http2.SecureClientSessionOptions =
this.credentials._getConnectionOptions() || {};
this.credentials._getConnectionOptions() || {};
let addressScheme = 'http://';
if ('secureContext' in connectionOptions) {
addressScheme = 'https://';
@ -217,20 +226,30 @@ export class Subchannel {
connectionOptions.servername = this.channelTarget;
}
}
this.session = http2.connect(addressScheme + this.subchannelAddress, connectionOptions);
this.session = http2.connect(
addressScheme + this.subchannelAddress,
connectionOptions
);
this.session.unref();
this.session.once('connect', () => {
this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY);
this.transitionToState(
[ConnectivityState.CONNECTING],
ConnectivityState.READY
);
});
this.session.once('close', () => {
this.transitionToState([ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
);
});
this.session.once('goaway', () => {
this.transitionToState([ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.IDLE);
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.IDLE
);
});
this.session.once('error', (error) => {
this.session.once('error', error => {
/* Do nothing here. Any error should also trigger a close event, which is
* where we want to handle that. */
});
@ -250,7 +269,7 @@ export class Subchannel {
if (oldStates.indexOf(this.connectivityState) === -1) {
return false;
}
let previousState = this.connectivityState;
const previousState = this.connectivityState;
this.connectivityState = newState;
switch (newState) {
case ConnectivityState.READY:
@ -272,6 +291,9 @@ export class Subchannel {
this.stopBackoff();
this.session = null;
this.stopKeepalivePings();
break;
default:
throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
}
/* We use a shallow copy of the stateListeners array in case a listener
* is removed during this iteration */
@ -289,10 +311,14 @@ export class Subchannel {
/* If no calls, channels, or subchannel pools have any more references to
* this subchannel, we can be sure it will never be used again. */
if (this.callRefcount === 0 && this.refcount === 0) {
this.transitionToState([ConnectivityState.CONNECTING,
ConnectivityState.IDLE,
ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
this.transitionToState(
[
ConnectivityState.CONNECTING,
ConnectivityState.IDLE,
ConnectivityState.READY,
],
ConnectivityState.TRANSIENT_FAILURE
);
}
}
@ -338,8 +364,8 @@ export class Subchannel {
* Start a stream on the current session with the given `metadata` as headers
* and then attach it to the `callStream`. Must only be called if the
* subchannel's current connectivity state is READY.
* @param metadata
* @param callStream
* @param metadata
* @param callStream
*/
startCallStream(metadata: Metadata, callStream: Http2CallStream) {
const headers = metadata.toHttp2Headers();
@ -368,7 +394,12 @@ export class Subchannel {
* because the state is not currently IDLE, check if it is
* TRANSIENT_FAILURE, and if so indicate that it should go back to
* connecting after the backoff timer ends. Otherwise do nothing */
if (!this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING)) {
if (
!this.transitionToState(
[ConnectivityState.IDLE],
ConnectivityState.CONNECTING
)
) {
if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
this.continueConnecting = true;
}
@ -385,7 +416,7 @@ export class Subchannel {
/**
* Add a listener function to be called whenever the subchannel's
* connectivity state changes.
* @param listener
* @param listener
*/
addConnectivityStateListener(listener: ConnectivityStateListener) {
this.stateListeners.push(listener);
@ -408,6 +439,9 @@ export class Subchannel {
*/
resetBackoff() {
this.backoffTimeout.reset();
this.transitionToState([ConnectivityState.TRANSIENT_FAILURE], ConnectivityState.CONNECTING);
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE],
ConnectivityState.CONNECTING
);
}
}
}

View File

@ -273,7 +273,9 @@ describe('CallStream', () => {
frameLengths: range(0, 20).map(() => 1),
},
].forEach((testCase: { description: string; frameLengths: number[] }) => {
it(`should handle a short message where ${testCase.description}`, done => {
it(`should handle a short message where ${
testCase.description
}`, done => {
const callStream = new Http2CallStream(
'foo',
{} as ChannelImplementation,