Add new subchannel and load balancing code

This commit is contained in:
murgatroid99 2019-08-06 10:18:15 -07:00
parent acdd2abfc3
commit e612cd9934
11 changed files with 981 additions and 116 deletions

View File

@ -16,6 +16,7 @@
*/
import { Metadata } from './metadata';
import { Call } from '.';
export interface CallMetadataOptions {
service_url: string;
@ -44,6 +45,14 @@ export abstract class CallCredentials {
*/
abstract compose(callCredentials: CallCredentials): CallCredentials;
/**
* Check whether two call credentials objects are equal. Separate
* SingleCallCredentials with identical metadata generator functions are
* equal.
* @param other The other CallCredentials object to compare with.
*/
abstract _equals(other: CallCredentials): boolean;
/**
* Creates a new CallCredentials object from a given function that generates
* Metadata objects.
@ -81,6 +90,17 @@ class ComposedCallCredentials extends CallCredentials {
compose(other: CallCredentials): CallCredentials {
return new ComposedCallCredentials(this.creds.concat([other]));
}
_equals(other: CallCredentials): boolean {
if (this === other) {
return true;
}
if (other instanceof ComposedCallCredentials) {
return this.creds.every((value, index) => value._equals(other.creds[index]));
} else {
return false;
}
}
}
class SingleCallCredentials extends CallCredentials {
@ -103,7 +123,18 @@ class SingleCallCredentials extends CallCredentials {
compose(other: CallCredentials): CallCredentials {
return new ComposedCallCredentials([this, other]);
}
}
_equals(other: CallCredentials): boolean {
if (this === other) {
return true;
}
if (other instanceof SingleCallCredentials) {
return this.metadataGenerator === other.metadataGenerator;
} else {
return false;
}
}
}
class EmptyCallCredentials extends CallCredentials {
generateMetadata(options: CallMetadataOptions): Promise<Metadata> {
@ -113,4 +144,8 @@ class EmptyCallCredentials extends CallCredentials {
compose(other: CallCredentials): CallCredentials {
return other;
}
_equals(other: CallCredentials): boolean {
return other instanceof EmptyCallCredentials;
}
}

View File

@ -18,6 +18,7 @@
import { ConnectionOptions, createSecureContext, PeerCertificate } from 'tls';
import { CallCredentials } from './call-credentials';
import { Call } from '.';
// tslint:disable-next-line:no-any
function verifyIsBufferOrNull(obj: any, friendlyName: string): void {
@ -47,6 +48,14 @@ export type CheckServerIdentityCallback = (
cert: Certificate
) => Error | undefined;
function bufferOrNullEqual(buf1: Buffer | null, buf2: Buffer | null) {
if (buf1 === null && buf2 === null) {
return true;
} else {
return buf1 !== null && buf2 !== null && buf1.equals(buf2);
}
}
/**
* Additional peer verification options that can be set when creating
* SSL credentials.
@ -97,6 +106,13 @@ export abstract class ChannelCredentials {
*/
abstract _isSecure(): boolean;
/**
* Check whether two channel credentials objects are equal. Two secure
* credentials are equal if they were constructed with the same parameters.
* @param other The other ChannelCredentials Object
*/
abstract _equals(other: ChannelCredentials): boolean;
/**
* Return a new ChannelCredentials instance with a given set of credentials.
* The resulting instance can be used to construct a Channel that communicates
@ -124,21 +140,7 @@ export abstract class ChannelCredentials {
'Certificate chain must be given with accompanying private key'
);
}
const secureContext = createSecureContext({
ca: rootCerts || undefined,
key: privateKey || undefined,
cert: certChain || undefined,
});
const connectionOptions: ConnectionOptions = { secureContext };
if (verifyOptions && verifyOptions.checkServerIdentity) {
connectionOptions.checkServerIdentity = (
host: string,
cert: PeerCertificate
) => {
return verifyOptions.checkServerIdentity!(host, { raw: cert.raw });
};
}
return new SecureChannelCredentialsImpl(connectionOptions);
return new SecureChannelCredentialsImpl(rootCerts || null, privateKey || null, certChain || null, verifyOptions || {});
}
/**
@ -164,27 +166,42 @@ class InsecureChannelCredentialsImpl extends ChannelCredentials {
_isSecure(): boolean {
return false;
}
_equals(other: ChannelCredentials): boolean {
return other instanceof InsecureChannelCredentialsImpl;
}
}
class SecureChannelCredentialsImpl extends ChannelCredentials {
connectionOptions: ConnectionOptions;
constructor(
connectionOptions: ConnectionOptions,
callCredentials?: CallCredentials
private rootCerts: Buffer | null,
private privateKey: Buffer | null,
private certChain: Buffer | null,
private verifyOptions: VerifyOptions
) {
super(callCredentials);
this.connectionOptions = connectionOptions;
super();
const secureContext = createSecureContext({
ca: rootCerts || undefined,
key: privateKey || undefined,
cert: certChain || undefined,
});
this.connectionOptions = { secureContext };
if (verifyOptions && verifyOptions.checkServerIdentity) {
this.connectionOptions.checkServerIdentity = (
host: string,
cert: PeerCertificate
) => {
return verifyOptions.checkServerIdentity!(host, { raw: cert.raw });
};
}
}
compose(callCredentials: CallCredentials): ChannelCredentials {
const combinedCallCredentials = this.callCredentials.compose(
callCredentials
);
return new SecureChannelCredentialsImpl(
this.connectionOptions,
combinedCallCredentials
);
return new ComposedChannelCredentialsImpl(this, combinedCallCredentials);
}
_getConnectionOptions(): ConnectionOptions | null {
@ -193,4 +210,50 @@ class SecureChannelCredentialsImpl extends ChannelCredentials {
_isSecure(): boolean {
return true;
}
_equals(other: ChannelCredentials): boolean {
if (this === other) {
return true;
}
if (other instanceof SecureChannelCredentialsImpl) {
if (!bufferOrNullEqual(this.rootCerts, other.rootCerts)) {
return false;
}
if (!bufferOrNullEqual(this.privateKey, other.privateKey)) {
return false;
}
if (!bufferOrNullEqual(this.certChain, other.certChain)) {
return false;
}
return this.verifyOptions.checkServerIdentity === other.verifyOptions.checkServerIdentity;
} else {
return false;
}
}
}
class ComposedChannelCredentialsImpl extends ChannelCredentials {
constructor (private channelCredentials: SecureChannelCredentialsImpl, callCreds: CallCredentials) {
super(callCreds);
}
compose(callCredentials: CallCredentials) {
const combinedCallCredentials = this.callCredentials.compose(callCredentials);
return new ComposedChannelCredentialsImpl(this.channelCredentials, combinedCallCredentials);
}
_getConnectionOptions(): ConnectionOptions | null {
return this.channelCredentials._getConnectionOptions();
}
_isSecure(): boolean {
return true;
}
_equals(other: ChannelCredentials): boolean {
if (this === other) {
return true;
}
if (other instanceof ComposedChannelCredentialsImpl) {
return this.channelCredentials._equals(other.channelCredentials) && this.callCredentials._equals(other.callCredentials);
} else {
return false;
}
}
}

View File

@ -19,13 +19,13 @@
* An interface that contains options used when initializing a Channel instance.
*/
export interface ChannelOptions {
'grpc.ssl_target_name_override': string;
'grpc.primary_user_agent': string;
'grpc.secondary_user_agent': string;
'grpc.default_authority': string;
'grpc.keepalive_time_ms': number;
'grpc.keepalive_timeout_ms': number;
[key: string]: string | number;
'grpc.ssl_target_name_override'?: string;
'grpc.primary_user_agent'?: string;
'grpc.secondary_user_agent'?: string;
'grpc.default_authority'?: string;
'grpc.keepalive_time_ms'?: number;
'grpc.keepalive_timeout_ms'?: number;
[key: string]: string | number | undefined;
}
/**
@ -40,3 +40,20 @@ export const recognizedOptions = {
'grpc.keepalive_time_ms': true,
'grpc.keepalive_timeout_ms': true,
};
export function channelOptionsEqual(options1: ChannelOptions, options2: ChannelOptions) {
const keys1 = Object.keys(options1).sort();
const keys2 = Object.keys(options2).sort();
if (keys1.length !== keys2.length) {
return false;
}
for (let i = 0; i < keys1.length; i+=1) {
if (keys1[i] !== keys2[i]) {
return false;
}
if (options1[keys1[i]] !== options2[keys2[i]]) {
return false;
}
}
return true;
}

View File

@ -0,0 +1,198 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { LoadBalancer, ChannelControlHelper, registerLoadBalancerType } from './load-balancer';
import { ConnectivityState } from './channel';
import { QueuePicker, Picker, PickArgs, CompletePickResult, PickResultType, UnavailablePicker } from './picker';
import { LoadBalancingConfig } from './load-balancing-config';
import { Subchannel, SubchannelConnectivityState, ConnectivityStateListener } from './subchannel';
const TYPE_NAME = 'pick_first';
const CONNECTION_DELAY_INTERVAL_MS = 250;
class PickFirstPicker implements Picker {
constructor(private subchannel: Subchannel) {}
pick(pickArgs: PickArgs) : CompletePickResult {
return {
pickResultType: PickResultType.COMPLETE,
subchannel: this.subchannel,
status: null
}
}
}
export class PickFirstLoadBalancer implements LoadBalancer {
private latestAddressList: string[] = [];
private subchannels: Subchannel[] = [];
private currentState: ConnectivityState = ConnectivityState.IDLE;
private currentSubchannelIndex: number = 0;
private subchannelConnectingCount: number = 0;
private currentPick: Subchannel | null = null;
private subchannelStateListener: ConnectivityStateListener;
private pickedSubchannelStateListener: ConnectivityStateListener;
private connectionDelayTimeout: NodeJS.Timeout;
constructor(private channelControlHelper: ChannelControlHelper) {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.subchannelStateListener = (subchannel: Subchannel, previousState: SubchannelConnectivityState, newState: SubchannelConnectivityState) => {
if (previousState === SubchannelConnectivityState.CONNECTING) {
this.subchannelConnectingCount -= 1;
}
if (newState === SubchannelConnectivityState.CONNECTING) {
this.subchannelConnectingCount += 1;
}
if (newState === SubchannelConnectivityState.READY) {
this.pickSubchannel(subchannel);
return;
} else {
if (this.currentPick === null) {
if (newState === SubchannelConnectivityState.TRANSIENT_FAILURE || newState === SubchannelConnectivityState.IDLE) {
subchannel.startConnecting();
}
const newLBState = this.subchannelConnectingCount > 0 ? ConnectivityState.CONNECTING : ConnectivityState.TRANSIENT_FAILURE;
if (newLBState !== this.currentState) {
if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
this.updateState(newLBState, new UnavailablePicker());
} else {
this.updateState(newLBState, new QueuePicker(this));
}
}
}
}
};
this.pickedSubchannelStateListener = (subchannel: Subchannel, previousState: SubchannelConnectivityState, newState: SubchannelConnectivityState) => {
if (newState !== SubchannelConnectivityState.READY) {
subchannel.unref();
subchannel.removeConnectivityStateListener(this.pickedSubchannelStateListener);
if (this.subchannels.length > 0) {
const newLBState = this.subchannelConnectingCount > 0 ? ConnectivityState.CONNECTING : ConnectivityState.TRANSIENT_FAILURE;
if (newLBState === ConnectivityState.TRANSIENT_FAILURE) {
this.updateState(newLBState, new UnavailablePicker());
} else {
this.updateState(newLBState, new QueuePicker(this));
}
} else {
this.connectToAddressList();
this.channelControlHelper.requestReresolution();
}
}
};
this.connectionDelayTimeout = setTimeout(() => {}, 0);
clearTimeout(this.connectionDelayTimeout);
}
private startConnecting(subchannelIndex: number) {
if (this.subchannels[subchannelIndex].getConnectivityState() === SubchannelConnectivityState.IDLE) {
this.subchannels[subchannelIndex].startConnecting();
}
this.connectionDelayTimeout = setTimeout(() => {
for (const [index, subchannel] of this.subchannels.entries()) {
if (index > subchannelIndex) {
const subchannelState = subchannel.getConnectivityState();
if (subchannelState === SubchannelConnectivityState.IDLE || subchannelState === SubchannelConnectivityState.CONNECTING) {
this.startConnecting(index);
return;
}
}
}
}, CONNECTION_DELAY_INTERVAL_MS)
}
private pickSubchannel(subchannel: Subchannel) {
if (this.currentPick !== null) {
this.currentPick.unref();
this.currentPick.removeConnectivityStateListener(this.pickedSubchannelStateListener);
}
this.currentPick = subchannel;
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener);
subchannel.ref();
this.resetSubchannelList();
clearTimeout(this.connectionDelayTimeout);
}
private updateState(newState: ConnectivityState, picker: Picker) {
this.currentState = newState;
this.channelControlHelper.updateState(newState, picker);
}
private resetSubchannelList() {
for (const subchannel of this.subchannels) {
subchannel.unref();
subchannel.removeConnectivityStateListener(this.subchannelStateListener);
}
this.currentSubchannelIndex = 0;
this.subchannelConnectingCount = 0;
this.subchannels = [];
}
private connectToAddressList(): void {
this.resetSubchannelList();
this.subchannels = this.latestAddressList.map((address) => this.channelControlHelper.createSubchannel(address, {}));
for (const subchannel of this.subchannels) {
subchannel.addConnectivityStateListener(this.subchannelStateListener);
if (subchannel.getConnectivityState() === SubchannelConnectivityState.READY) {
this.pickSubchannel(subchannel);
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
this.resetSubchannelList();
return;
}
}
for (const [index, subchannel] of this.subchannels.entries()) {
const subchannelState = subchannel.getConnectivityState();
if (subchannelState === SubchannelConnectivityState.IDLE || subchannelState === SubchannelConnectivityState.CONNECTING) {
this.startConnecting(index);
}
}
}
updateAddressList(addressList: string[], lbConfig?: LoadBalancingConfig): void {
// lbConfig has no useful information for pick first load balancing
this.latestAddressList = addressList;
this.connectToAddressList();
}
exitIdle() {
if (this.currentState === ConnectivityState.IDLE) {
if (this.latestAddressList.length > 0) {
this.connectToAddressList();
}
}
}
resetBackoff() {
// I'm not actually sure what this is supposed to do
}
destroy() {
this.resetSubchannelList();
if (this.currentPick !== null) {
this.currentPick.unref();
this.currentPick.removeConnectivityStateListener(this.pickedSubchannelStateListener);
}
}
getTypeName(): string {
return TYPE_NAME;
}
}
export function setup(): void {
registerLoadBalancerType(TYPE_NAME, PickFirstLoadBalancer);
}

View File

@ -0,0 +1,107 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { ChannelOptions } from "./channel-options";
import { Subchannel } from "./subchannel";
import { ConnectivityState } from "./channel";
import { Picker } from "./picker";
import { LoadBalancingConfig } from "./load-balancing-config";
/**
* A collection of functions associated with a channel that a load balancer
* can call as necessary.
*/
export interface ChannelControlHelper {
/**
* Returns a subchannel connected to the specified address.
* @param subchannelAddress The address to connect to
* @param subchannelArgs Extra channel arguments specified by the load balancer
*/
createSubchannel(subchannelAddress: String, subchannelArgs: ChannelOptions): Subchannel;
/**
* Passes a new subchannel picker up to the channel. This is called if either
* the connectivity state changes or if a different picker is needed for any
* other reason.
* @param connectivityState New connectivity state
* @param picker New picker
*/
updateState(connectivityState: ConnectivityState, picker: Picker): void;
/**
* Request new data from the resolver.
*/
requestReresolution(): void;
}
/**
* Tracks one or more connected subchannels and determines which subchannel
* each request should use.
*/
export interface LoadBalancer {
/**
* Gives the load balancer a new list of addresses to start connecting to.
* The load balancer will start establishing connections with the new list,
* but will continue using any existing connections until the new connections
* are established
* @param addressList The new list of addresses to connect to
* @param lbConfig The load balancing config object from the service config,
* if one was provided
*/
updateAddressList(addressList: string[], lbConfig?: LoadBalancingConfig): void;
/**
* If the load balancer is currently in the IDLE state, start connecting.
*/
exitIdle(): void;
/**
* If the load balancer is currently in the CONNECTING or TRANSIENT_FAILURE
* state, reset the current connection backoff timeout to its base value and
* transition to CONNECTING if in TRANSIENT_FAILURE.
*/
resetBackoff(): void;
/**
* The load balancer unrefs all of its subchannels and stops calling methods
* of its channel control helper.
*/
destroy(): void;
/**
* Get the type name for this load balancer type. Must be constant across an
* entire load balancer implementation class and must match the name that the
* balancer implementation class was registered with.
*/
getTypeName(): string;
}
export interface LoadBalancerConstructor {
new(channelControlHelper: ChannelControlHelper): LoadBalancer;
}
const registeredLoadBalancerTypes: {[name: string]: LoadBalancerConstructor} = {};
export function registerLoadBalancerType(typeName: string, loadBalancerType: LoadBalancerConstructor) {
registeredLoadBalancerTypes[typeName] = loadBalancerType;
}
export function createLoadBalancer(typeName: string, channelControlHelper: ChannelControlHelper): LoadBalancer | null {
if (typeName in registeredLoadBalancerTypes) {
return new registeredLoadBalancerTypes[typeName](channelControlHelper);
} else {
return null;
}
}
export function isLoadBalancerNameRegistered(typeName: string): boolean {
return typeName in registeredLoadBalancerTypes;
}

View File

@ -0,0 +1,92 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { Subchannel } from "./subchannel";
import { StatusObject } from "./call-stream";
import { Metadata } from "./metadata";
import { Status } from "./constants";
import { LoadBalancer } from "./load-balancer";
export enum PickResultType {
COMPLETE,
QUEUE,
TRANSIENT_FAILURE
}
export interface PickResult {
pickResultType: PickResultType,
subchannel: Subchannel | null,
status: StatusObject | null
}
export interface CompletePickResult extends PickResult {
pickResultType: PickResultType.COMPLETE,
subchannel: Subchannel | null,
status: null
}
export interface QueuePickResult extends PickResult {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null
}
export interface TransientFailurePickResult extends PickResult {
pickResultType: PickResultType.TRANSIENT_FAILURE,
subchannel: null,
status: StatusObject
}
export interface PickArgs {
metadata: Metadata
}
export interface Picker {
pick(pickArgs: PickArgs): PickResult;
}
export class UnavailablePicker implements Picker {
pick(pickArgs: PickArgs): TransientFailurePickResult {
return {
pickResultType: PickResultType.TRANSIENT_FAILURE,
subchannel: null,
status: {
code: Status.UNAVAILABLE,
details: "No connection established",
metadata: new Metadata()
}
};
}
}
export class QueuePicker {
private calledExitIdle: boolean = false;
// Constructed with a load balancer. Calls exitIdle on it the first time pick is called
constructor(private loadBalancer: LoadBalancer) {}
pick(pickArgs: PickArgs): QueuePickResult {
if (!this.calledExitIdle) {
this.loadBalancer.exitIdle();
this.calledExitIdle = true;
}
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null
}
}
}

View File

@ -18,6 +18,8 @@ import { Resolver, ResolverListener, registerResolver, registerDefaultResolver }
import * as dns from 'dns';
import * as util from 'util';
import { extractAndSelectServiceConfig, ServiceConfig } from './service-config';
import { ServiceError } from './call';
import { Status } from './constants';
/* These regular expressions match IP addresses with optional ports in different
* formats. In each case, capture group 1 contains the address, and capture
@ -113,14 +115,16 @@ class DnsResolver implements Resolver {
this.pendingResultPromise = null;
const allAddresses: string[] = mergeArrays(AAAArecord, Arecord);
let serviceConfig: ServiceConfig | null = null;
let serviceConfigError: Error | null = null;
let serviceConfigError: ServiceError | null = null;
if (TXTrecord instanceof Error) {
serviceConfigError = TXTrecord;
serviceConfigError = TXTrecord as ServiceError;
serviceConfigError.code = Status.UNAVAILABLE;
} else {
try {
serviceConfig = extractAndSelectServiceConfig(TXTrecord, this.percentage);
} catch (err) {
serviceConfigError = err;
serviceConfigError = err as ServiceError;
serviceConfigError.code = Status.UNAVAILABLE;
}
}
this.listener.onSuccessfulResolution(allAddresses, serviceConfig, serviceConfigError);

View File

@ -19,7 +19,7 @@ import { ServiceError } from "./call";
import { ServiceConfig } from "./service-config";
export interface ResolverListener {
onSuccessfulResolution(addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: Error | null): void;
onSuccessfulResolution(addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null): void;
onError(error: ServiceError): void;
}

View File

@ -0,0 +1,92 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { ChannelControlHelper, LoadBalancer, isLoadBalancerNameRegistered } from "./load-balancer";
import { ServiceConfig } from "./service-config";
import { ConnectivityState } from "./channel";
import { createResolver, Resolver } from "./resolver";
import { ServiceError } from "./call";
const DEFAULT_LOAD_BALANCER_NAME = 'pick_first';
export class ResolvingLoadBalancer {
private innerResolver: Resolver;
private innerLoadBalancer: LoadBalancer | null = null;
private pendingReplacementLoadBalancer: LoadBalancer | null = null;
private currentState: ConnectivityState = ConnectivityState.IDLE;
/**
* The service config object from the last successful resolution, if
* available. A value of undefined indicates that there has not yet
* been a successful resolution. A value of null indicates that the last
* successful resolution explicitly provided a null service config.
*/
private previousServiceConfig: ServiceConfig | null | undefined = undefined;
constructor (private target: string, private channelControlHelper: ChannelControlHelper, private defaultServiceConfig: ServiceConfig | null) {
this.innerResolver = createResolver(target, {
onSuccessfulResolution: (addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null) => {
let workingServiceConfig: ServiceConfig | null = null;
if (serviceConfig === null) {
if (serviceConfigError === null) {
this.previousServiceConfig = serviceConfig;
workingServiceConfig = this.defaultServiceConfig;
} else {
if (this.defaultServiceConfig === undefined) {
// resolution actually failed
} else {
workingServiceConfig = this.defaultServiceConfig;
}
}
} else {
workingServiceConfig = serviceConfig;
this.previousServiceConfig = serviceConfig;
}
let loadBalancerName: string | null = null;
if (workingServiceConfig === null || workingServiceConfig.loadBalancingConfig.length === 0) {
loadBalancerName = DEFAULT_LOAD_BALANCER_NAME;
} else {
for (const lbConfig of workingServiceConfig.loadBalancingConfig) {
for (const key in lbConfig) {
if (Object.prototype.hasOwnProperty.call(lbConfig, key)) {
if (isLoadBalancerNameRegistered(key)) {
loadBalancerName = key;
break;
}
}
}
if (loadBalancerName !== null) {
break;
}
}
if (loadBalancerName === null) {
// There were load balancing configs but none are supported. This counts as a resolution failure
// TODO: handle error
return;
}
}
},
onError: (error: ServiceError) => {
this.handleResolutionFailure(error);
}
});
}
private handleResolutionFailure(error: ServiceError) {
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { ChannelOptions, channelOptionsEqual } from "./channel-options";
import { Subchannel } from "./subchannel";
import { ChannelCredentials } from "./channel-credentials";
// 10 seconds in milliseconds. This value is arbitrary.
const REF_CHECK_INTERVAL = 10_000;
export class SubchannelPool {
private pool: {[channelTarget: string]: {[subchannelTarget: string]: {channelArguments: ChannelOptions, channelCredentials: ChannelCredentials, subchannel: Subchannel}[]}} = Object.create(null);
constructor(private global: boolean) {
if (global) {
setInterval(() => {
for (const channelTarget in this.pool) {
for (const subchannelTarget in this.pool[channelTarget]) {
const subchannelObjArray = this.pool[channelTarget][subchannelTarget];
/* For each subchannel in the pool, try to unref it if it has
* exactly one ref (which is the ref from the pool itself). If that
* does happen, remove the subchannel from the pool */
this.pool[channelTarget][subchannelTarget] = subchannelObjArray.filter((value) => !value.subchannel.unrefIfOneRef());
}
}
/* Currently we do not delete keys with empty values. If that results
* in significant memory usage we should change it. */
}, REF_CHECK_INTERVAL).unref();
// Unref because this timer should not keep the event loop running
}
}
getOrCreateSubchannel(channelTarget: string, subchannelTarget: string, channelArguments: ChannelOptions, channelCredentials: ChannelCredentials): Subchannel {
if (channelTarget in this.pool) {
if (subchannelTarget in this.pool[channelTarget]){
const subchannelObjArray = this.pool[channelTarget][subchannelTarget];
for (const subchannelObj of subchannelObjArray) {
if (channelOptionsEqual(channelArguments, subchannelObj.channelArguments) && channelCredentials._equals(subchannelObj.channelCredentials)) {
return subchannelObj.subchannel;
}
}
}
}
// If we get here, no matching subchannel was found
const subchannel = new Subchannel(channelTarget, subchannelTarget, channelArguments, channelCredentials);
if (!(channelTarget in this.pool)) {
this.pool[channelTarget] = Object.create(null);
}
if (!(subchannelTarget in this.pool[channelTarget])) {
this.pool[channelTarget][subchannelTarget] = [];
}
this.pool[channelTarget][subchannelTarget].push({channelArguments, channelCredentials, subchannel});
if (this.global) {
subchannel.ref();
}
return subchannel;
}
}
const globalSubchannelPool = new SubchannelPool(true);
export function getOrCreateSubchannel(channelTarget: string, subchannelTarget: string, channelArguments: ChannelOptions, channelCredentials: ChannelCredentials): Subchannel {
return globalSubchannelPool.getOrCreateSubchannel(channelTarget, subchannelTarget, channelArguments, channelCredentials);
}

View File

@ -15,13 +15,35 @@
*
*/
import { EventEmitter } from 'events';
import * as http2 from 'http2';
import * as url from 'url';
import { Call, Http2CallStream } from './call-stream';
import { ChannelOptions } from './channel-options';
import { ChannelCredentials } from './channel-credentials';
import { Metadata } from './metadata';
import { Http2CallStream } from './call-stream';
import { ChannelOptions } from './channel-options';
import { PeerCertificate, checkServerIdentity } from 'tls';
const { version: clientVersion } = require('../../package.json');
const MIN_CONNECT_TIMEOUT_MS = 20000;
const INITIAL_BACKOFF_MS = 1000;
const BACKOFF_MULTIPLIER = 1.6;
const MAX_BACKOFF_MS = 120000;
const BACKOFF_JITTER = 0.2;
/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
* have a constant for the max signed 32 bit integer, so this is a simple way
* to calculate it */
const KEEPALIVE_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000;
export enum SubchannelConnectivityState {
READY,
CONNECTING,
TRANSIENT_FAILURE,
IDLE
};
export type ConnectivityStateListener = (subchannel: Subchannel, previousState: SubchannelConnectivityState, newState: SubchannelConnectivityState) => void;
const {
HTTP2_HEADER_AUTHORITY,
@ -32,25 +54,21 @@ const {
HTTP2_HEADER_USER_AGENT,
} = http2.constants;
/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
* have a constant for the max signed 32 bit integer, so this is a simple way
* to calculate it */
const KEEPALIVE_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000;
export interface SubChannel extends EventEmitter {
/**
* Attach a call stream to this subchannel's connection to start it
* @param headers The headers to start the stream with
* @param callStream The stream to start
*/
startCallStream(metadata: Metadata, callStream: Call): void;
close(): void;
function uniformRandom(min: number, max: number) {
return Math.random() * (max - min) + min;
}
export class Http2SubChannel extends EventEmitter implements SubChannel {
private session: http2.ClientHttp2Session;
private refCount = 0;
export class Subchannel {
private connectivityState: SubchannelConnectivityState = SubchannelConnectivityState.IDLE;
private session: http2.ClientHttp2Session | null = null;
// Indicates that we should continue conection attempts after backoff time ends
private continueConnecting: boolean = false;
private stateListeners: ConnectivityStateListener[] = [];
private backoffTimerId: NodeJS.Timer;
// The backoff value that will be used the next time we try to connect
private nextBackoff: number = INITIAL_BACKOFF_MS;
private userAgent: string;
private keepaliveTimeMs: number = KEEPALIVE_TIME_MS;
@ -58,73 +76,80 @@ export class Http2SubChannel extends EventEmitter implements SubChannel {
private keepaliveIntervalId: NodeJS.Timer;
private keepaliveTimeoutId: NodeJS.Timer;
constructor(
target: url.URL,
connectionOptions: http2.SecureClientSessionOptions,
userAgent: string,
channelArgs: Partial<ChannelOptions>
) {
super();
this.session = http2.connect(target, connectionOptions);
this.session.unref();
this.session.on('connect', () => {
this.emit('connect');
});
this.session.on('close', () => {
this.stopKeepalivePings();
this.emit('close');
});
this.session.on('error', () => {
this.stopKeepalivePings();
this.emit('close');
});
this.session.on('goaway', () => {
this.stopKeepalivePings();
this.emit('close');
});
this.userAgent = userAgent;
/**
* Tracks calls with references to this subchannel
*/
private callRefcount: number = 0;
/**
* Tracks channels and subchannel pools with references to this subchannel
*/
private refcount: number = 0;
if (channelArgs['grpc.keepalive_time_ms']) {
this.keepaliveTimeMs = channelArgs['grpc.keepalive_time_ms']!;
constructor(private channelTarget: string,
private subchannelAddress: string,
private options: ChannelOptions,
private credentials: ChannelCredentials) {
// Build user-agent string.
this.userAgent = [
options['grpc.primary_user_agent'],
`grpc-node-js/${clientVersion}`,
options['grpc.secondary_user_agent'],
]
.filter(e => e)
.join(' '); // remove falsey values first
/* The only purpose of these lines is to ensure that this.backoffTimerId has
* a value of type NodeJS.Timer. */
this.backoffTimerId = setTimeout(() => {}, 0);
clearTimeout(this.backoffTimerId);
if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
}
if ('grpc.keepalive_timeout_ms' in options) {
this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
}
this.keepaliveIntervalId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveIntervalId);
this.keepaliveTimeoutId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveTimeoutId);
}
if (channelArgs['grpc.keepalive_timeout_ms']) {
this.keepaliveTimeoutMs = channelArgs['grpc.keepalive_timeout_ms']!;
}
this.keepaliveIntervalId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveIntervalId);
this.keepaliveTimeoutId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveTimeoutId);
/**
* Start a backoff timer with the current nextBackoff timeout
*/
private startBackoff() {
this.backoffTimerId = setTimeout(() => {
if (this.continueConnecting) {
this.transitionToState([SubchannelConnectivityState.TRANSIENT_FAILURE, SubchannelConnectivityState.CONNECTING],
SubchannelConnectivityState.CONNECTING);
} else {
this.transitionToState([SubchannelConnectivityState.TRANSIENT_FAILURE, SubchannelConnectivityState.CONNECTING],
SubchannelConnectivityState.IDLE);
}
}, this.nextBackoff)
const nextBackoff = Math.min(this.nextBackoff * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS);
const jitterMagnitude = nextBackoff * BACKOFF_JITTER;
this.nextBackoff = nextBackoff + uniformRandom(-jitterMagnitude, jitterMagnitude);
}
private ref() {
if (this.refCount === 0) {
this.session.ref();
this.startKeepalivePings();
}
this.refCount += 1;
}
private unref() {
this.refCount -= 1;
if (this.refCount === 0) {
this.session.unref();
this.stopKeepalivePings();
}
private stopBackoff() {
clearTimeout(this.backoffTimerId);
this.nextBackoff = INITIAL_BACKOFF_MS;
}
private sendPing() {
this.keepaliveTimeoutId = setTimeout(() => {
this.emit('close');
// Not sure what to do when keepalive pings fail
}, this.keepaliveTimeoutMs);
this.session.ping(
this.session!.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(this.keepaliveTimeoutId);
}
);
}
/* TODO(murgatroid99): refactor subchannels so that keepalives can be handled
* per subchannel */
private startKeepalivePings() {
this.keepaliveIntervalId = setInterval(() => {
this.sendPing();
@ -137,7 +162,133 @@ export class Http2SubChannel extends EventEmitter implements SubChannel {
clearTimeout(this.keepaliveTimeoutId);
}
// Prerequisite: this subchannel is connected
private startConnectingInternal() {
const connectionOptions: http2.SecureClientSessionOptions =
this.credentials._getConnectionOptions() || {};
if (connectionOptions.secureContext !== null) {
// If provided, the value of grpc.ssl_target_name_override should be used
// to override the target hostname when checking server identity.
// This option is used for testing only.
if (this.options['grpc.ssl_target_name_override']) {
const sslTargetNameOverride = this.options[
'grpc.ssl_target_name_override'
]!;
connectionOptions.checkServerIdentity = (
host: string,
cert: PeerCertificate
): Error | undefined => {
return checkServerIdentity(sslTargetNameOverride, cert);
};
connectionOptions.servername = sslTargetNameOverride;
}
}
this.session = http2.connect(this.subchannelAddress, connectionOptions);
this.session.unref();
this.session.once('connect', () => {
this.transitionToState([SubchannelConnectivityState.CONNECTING], SubchannelConnectivityState.READY);
});
this.session.once('close', () => {
this.transitionToState([SubchannelConnectivityState.CONNECTING, SubchannelConnectivityState.READY],
SubchannelConnectivityState.TRANSIENT_FAILURE);
});
this.session.once('goaway', () => {
this.transitionToState([SubchannelConnectivityState.CONNECTING, SubchannelConnectivityState.READY],
SubchannelConnectivityState.IDLE);
});
}
/**
* Initiate a state transition from any element of oldStates to the new
* state. If the current connectivityState is not in oldStates, do nothing.
* @param oldStates The set of states to transition from
* @param newState The state to transition to
* @returns True if the state changed, false otherwise
*/
private transitionToState(
oldStates: SubchannelConnectivityState[],
newState: SubchannelConnectivityState
): boolean {
if (oldStates.indexOf(this.connectivityState) === -1) {
return false;
}
let previousState = this.connectivityState;
this.connectivityState = newState;
switch (newState) {
case SubchannelConnectivityState.READY:
this.stopBackoff();
break;
case SubchannelConnectivityState.CONNECTING:
this.startBackoff();
this.startConnectingInternal();
this.continueConnecting = false;
break;
case SubchannelConnectivityState.TRANSIENT_FAILURE:
this.session = null;
break;
case SubchannelConnectivityState.IDLE:
/* Stopping the backoff timer here is probably redundant because we
* should only transition to the IDLE state as a result of the timer
* ending, but we still want to reset the backoff timeout. */
this.stopBackoff();
this.session = null;
}
/* We use a shallow copy of the stateListeners array in case a listener
* is removed during this iteration */
for (const listener of [...this.stateListeners]) {
listener(this, previousState, newState);
}
return true;
}
private checkBothRefcounts() {
/* If no calls, channels, or subchannel pools have any more references to
* this subchannel, we can be sure it will never be used again. */
if (this.callRefcount === 0 && this.refcount === 0) {
this.transitionToState([SubchannelConnectivityState.CONNECTING,
SubchannelConnectivityState.IDLE,
SubchannelConnectivityState.READY],
SubchannelConnectivityState.TRANSIENT_FAILURE);
}
}
private callRef() {
if (this.callRefcount === 0) {
if (this.session) {
this.session.ref();
}
this.startKeepalivePings();
}
this.callRefcount += 1;
}
private callUnref() {
this.callRefcount -= 1;
if (this.callRefcount === 0) {
if (this.session) {
this.session.unref();
}
this.stopKeepalivePings();
this.checkBothRefcounts();
}
}
ref() {
this.refcount += 1;
}
unref() {
this.refcount -= 1;
this.checkBothRefcounts();
}
unrefIfOneRef(): boolean {
if (this.refcount === 1) {
this.unref();
return true;
}
return false;
}
startCallStream(metadata: Metadata, callStream: Http2CallStream) {
const headers = metadata.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
@ -146,15 +297,43 @@ export class Http2SubChannel extends EventEmitter implements SubChannel {
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = callStream.getMethod();
headers[HTTP2_HEADER_TE] = 'trailers';
const http2Stream = this.session.request(headers);
this.ref();
const http2Stream = this.session!.request(headers);
this.callRef();
http2Stream.on('close', () => {
this.unref();
this.callUnref();
});
callStream.attachHttp2Stream(http2Stream);
}
close() {
this.session.close();
startConnecting() {
/* First, try to transition from IDLE to connecting. If that doesn't happen
* because the state is not currently IDLE, check if it is
* TRANSIENT_FAILURE, and if so indicate that it should go back to
* connecting after the backoff timer ends. Otherwise do nothing */
if (!this.transitionToState([SubchannelConnectivityState.IDLE], SubchannelConnectivityState.CONNECTING)) {
if (this.connectivityState === SubchannelConnectivityState.TRANSIENT_FAILURE) {
this.continueConnecting = true;
}
}
}
}
getConnectivityState() {
return this.connectivityState;
}
addConnectivityStateListener(listener: ConnectivityStateListener) {
this.stateListeners.push(listener);
}
removeConnectivityStateListener(listener: ConnectivityStateListener) {
const listenerIndex = this.stateListeners.indexOf(listener);
if (listenerIndex > -1) {
this.stateListeners.splice(listenerIndex, 1);
}
}
resetBackoff() {
this.nextBackoff = INITIAL_BACKOFF_MS;
this.transitionToState([SubchannelConnectivityState.TRANSIENT_FAILURE], SubchannelConnectivityState.CONNECTING);
}
}