grpc-js: Add ChildLoadBalancerHandler and use it in ResolvingLoadBalancer

This commit is contained in:
Michael Lumish 2020-05-04 14:03:17 -07:00
parent 440d985f1f
commit 90013c695d
6 changed files with 266 additions and 282 deletions

View File

@ -0,0 +1,137 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import {
LoadBalancer,
ChannelControlHelper,
createLoadBalancer,
} from './load-balancer';
import {
SubchannelAddress,
Subchannel,
} from './subchannel';
import { LoadBalancingConfig } from './load-balancing-config';
import { ChannelOptions } from './channel-options';
import { ConnectivityState } from './channel';
import { Picker } from './picker';
const TYPE_NAME = 'child_load_balancer_helper';
export class ChildLoadBalancerHandler implements LoadBalancer {
private currentChild: LoadBalancer | null = null;
private pendingChild: LoadBalancer | null = null;
private ChildPolicyHelper = class {
private child: LoadBalancer | null = null;
constructor(private parent: ChildLoadBalancerHandler) {}
createSubchannel(subchannelAddress: SubchannelAddress, subchannelArgs: ChannelOptions): Subchannel {
return this.parent.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
}
updateState(connectivityState: ConnectivityState, picker: Picker): void {
if (this.calledByPendingChild()) {
if (connectivityState !== ConnectivityState.READY) {
return;
}
this.parent.currentChild?.destroy();
this.parent.currentChild = this.parent.pendingChild;
this.parent.pendingChild = null;
} else if (!this.calledByCurrentChild()) {
return;
}
this.parent.channelControlHelper.updateState(connectivityState, picker);
}
requestReresolution(): void {
const latestChild = this.parent.pendingChild ?? this.parent.currentChild;
if (this.child === latestChild) {
this.parent.channelControlHelper.requestReresolution();
}
}
setChild(newChild: LoadBalancer) {
this.child = newChild;
}
private calledByPendingChild(): boolean {
return this.child === this.parent.pendingChild;
}
private calledByCurrentChild(): boolean {
return this.child === this.parent.currentChild;
}
}
constructor(private readonly channelControlHelper: ChannelControlHelper) {}
/**
* Prerequisites: lbConfig !== null and lbConfig.name is registered
* @param addressList
* @param lbConfig
* @param attributes
*/
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig | null, attributes: { [key: string]: unknown; }): void {
if (lbConfig === null) {
return;
}
let childToUpdate: LoadBalancer;
if (this.currentChild === null || this.currentChild.getTypeName() !== lbConfig.name) {
const newHelper = new this.ChildPolicyHelper(this);
const newChild = createLoadBalancer(lbConfig.name, newHelper)!;
newHelper.setChild(newChild);
if (this.currentChild === null) {
this.currentChild = newChild;
childToUpdate = this.currentChild;
} else {
if (this.pendingChild) {
this.pendingChild.destroy();
}
this.pendingChild = newChild;
childToUpdate = this.pendingChild;
}
} else {
if (this.pendingChild === null) {
childToUpdate = this.currentChild;
} else {
childToUpdate = this.pendingChild;
}
}
childToUpdate.updateAddressList(addressList, lbConfig, attributes);
}
exitIdle(): void {
if (this.currentChild) {
this.currentChild.resetBackoff();
if (this.pendingChild) {
this.pendingChild.resetBackoff();
}
}
}
resetBackoff(): void {
if (this.currentChild) {
this.currentChild.resetBackoff();
if (this.pendingChild) {
this.pendingChild.resetBackoff();
}
}
}
destroy(): void {
if (this.currentChild) {
this.currentChild.destroy();
}
if (this.pendingChild) {
this.pendingChild.destroy();
}
}
getTypeName(): string {
return TYPE_NAME;
}
}

View File

@ -129,7 +129,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* @param channelControlHelper `ChannelControlHelper` instance provided by
* this load balancer's owner.
*/
constructor(private channelControlHelper: ChannelControlHelper) {
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.subchannelStateCounts = {
[ConnectivityState.CONNECTING]: 0,
@ -435,10 +435,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
getTypeName(): string {
return TYPE_NAME;
}
replaceChannelControlHelper(channelControlHelper: ChannelControlHelper) {
this.channelControlHelper = channelControlHelper;
}
}
export function setup(): void {

View File

@ -94,7 +94,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
private currentReadyPicker: RoundRobinPicker | null = null;
constructor(private channelControlHelper: ChannelControlHelper) {
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.subchannelStateCounts = {
[ConnectivityState.CONNECTING]: 0,
@ -229,11 +229,6 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
getTypeName(): string {
return TYPE_NAME;
}
replaceChannelControlHelper(
channelControlHelper: ChannelControlHelper
): void {
this.channelControlHelper = channelControlHelper;
}
}
export function setup() {

View File

@ -91,11 +91,6 @@ export interface LoadBalancer {
* balancer implementation class was registered with.
*/
getTypeName(): string;
/**
* Replace the existing ChannelControlHelper with a new one
* @param channelControlHelper The new ChannelControlHelper to use from now on
*/
replaceChannelControlHelper(channelControlHelper: ChannelControlHelper): void;
}
export interface LoadBalancerConstructor {
@ -128,6 +123,15 @@ export function isLoadBalancerNameRegistered(typeName: string): boolean {
return typeName in registeredLoadBalancerTypes;
}
export function getFirstUsableConfig(configs: LoadBalancingConfig[]): LoadBalancingConfig | null {
for (const config of configs) {
if (config.name in registeredLoadBalancerTypes) {
return config;
}
}
return null;
}
export function registerAll() {
load_balancer_pick_first.setup();
load_balancer_round_robin.setup();

View File

@ -25,6 +25,8 @@
* runtime */
/* eslint-disable @typescript-eslint/no-explicit-any */
export type PickFirstConfig = {};
export type RoundRobinConfig = {};
export interface XdsConfig {
@ -37,13 +39,58 @@ export interface GrpcLbConfig {
childPolicy: LoadBalancingConfig[];
}
export interface LoadBalancingConfig {
/* Exactly one of these must be set for a config to be valid */
round_robin?: RoundRobinConfig;
xds?: XdsConfig;
grpclb?: GrpcLbConfig;
export interface PriorityChild {
config: LoadBalancingConfig[];
}
export interface PriorityLbConfig {
children: Map<string, PriorityChild>;
priorities: string[];
}
export interface PickFirstLoadBalancingConfig {
name: 'pick_first';
pick_first: PickFirstConfig;
}
export interface RoundRobinLoadBalancingConfig {
name: 'round_robin';
round_robin: RoundRobinConfig;
}
export interface XdsLoadBalancingConfig {
name: 'xds';
xds: XdsConfig;
}
export interface GrpcLbLoadBalancingConfig {
name: 'grpclb';
grpclb: GrpcLbConfig;
}
export interface PriorityLoadBalancingConfig {
name: 'priority';
priority: PriorityLbConfig;
}
export type LoadBalancingConfig = PickFirstLoadBalancingConfig | RoundRobinLoadBalancingConfig | XdsLoadBalancingConfig | GrpcLbLoadBalancingConfig | PriorityLoadBalancingConfig;
export function isRoundRobinLoadBalancingConfig(lbconfig: LoadBalancingConfig): lbconfig is RoundRobinLoadBalancingConfig {
return lbconfig.name === 'round_robin';
}
export function isXdsLoadBalancingConfig(lbconfig: LoadBalancingConfig): lbconfig is XdsLoadBalancingConfig {
return lbconfig.name === 'xds';
}
export function isGrpcLbLoadBalancingConfig(lbconfig: LoadBalancingConfig): lbconfig is GrpcLbLoadBalancingConfig {
return lbconfig.name === 'grpclb';
}
export function isPriorityLoadBalancingConfig(lbconfig: LoadBalancingConfig): lbconfig is PriorityLoadBalancingConfig {
return lbconfig.name === 'priority';
}
/* In these functions we assume the input came from a JSON object. Therefore we
* expect that the prototype is uninteresting and that `in` can be used
* effectively */
@ -97,17 +144,26 @@ export function validateConfig(obj: any): LoadBalancingConfig {
throw new Error('Multiple load balancing policies configured');
}
if (obj['round_robin'] instanceof Object) {
return { round_robin: {} };
return {
name: 'round_robin',
round_robin: {}
};
}
}
if ('xds' in obj) {
if ('grpclb' in obj) {
throw new Error('Multiple load balancing policies configured');
}
return { xds: validateXdsConfig(obj.xds) };
return {
name: 'xds',
xds: validateXdsConfig(obj.xds)
};
}
if ('grpclb' in obj) {
return { grpclb: validateGrpcLbConfig(obj.grpclb) };
return {
name: 'grpclb',
grpclb: validateGrpcLbConfig(obj.grpclb)
};
}
throw new Error('No recognized load balancing policy configured');
}

View File

@ -18,14 +18,12 @@
import {
ChannelControlHelper,
LoadBalancer,
isLoadBalancerNameRegistered,
createLoadBalancer,
getFirstUsableConfig,
} from './load-balancer';
import { ServiceConfig } from './service-config';
import { ConnectivityState } from './channel';
import { createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
import { ChannelOptions } from './channel-options';
import { Picker, UnavailablePicker, QueuePicker } from './picker';
import { LoadBalancingConfig } from './load-balancing-config';
import { BackoffTimeout } from './backoff-timeout';
@ -36,6 +34,7 @@ import * as logging from './logging';
import { LogVerbosity } from './constants';
import { SubchannelAddress } from './subchannel';
import { GrpcUri, uriToString } from './uri-parser';
import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
const TRACER_NAME = 'resolving_load_balancer';
@ -50,19 +49,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
* The resolver class constructed for the target address.
*/
private innerResolver: Resolver;
/**
* Current internal load balancer used for handling calls.
* Invariant: innerLoadBalancer === null => pendingReplacementLoadBalancer === null.
*/
private innerLoadBalancer: LoadBalancer | null = null;
/**
* The load balancer instance that will be used in place of the current
* `innerLoadBalancer` once either that load balancer loses its connection
* or this one establishes a connection. For use when a new name resolution
* result comes in with a different load balancing configuration, and the
* current `innerLoadBalancer` is still connected.
*/
private pendingReplacementLoadBalancer: LoadBalancer | null = null;
private childLoadBalancer: ChildLoadBalancerHandler;
/**
* This resolving load balancer's current connectivity state.
*/
@ -74,34 +62,6 @@ export class ResolvingLoadBalancer implements LoadBalancer {
* successful resolution explicitly provided a null service config.
*/
private previousServiceConfig: ServiceConfig | null | undefined = undefined;
/**
* The most recently reported connectivity state of the `innerLoadBalancer`.
*/
private innerBalancerState: ConnectivityState = ConnectivityState.IDLE;
private innerBalancerPicker: Picker = new UnavailablePicker();
/**
* The most recent reported state of the pendingReplacementLoadBalancer.
* Starts at IDLE for type simplicity. This should get updated as soon as the
* pendingReplacementLoadBalancer gets constructed.
*/
private replacementBalancerState: ConnectivityState = ConnectivityState.IDLE;
/**
* The picker associated with the replacementBalancerState. Starts as an
* UnavailablePicker for type simplicity. This should get updated as soon as
* the pendingReplacementLoadBalancer gets constructed.
*/
private replacementBalancerPicker: Picker = new UnavailablePicker();
/**
* ChannelControlHelper for the innerLoadBalancer.
*/
private readonly innerChannelControlHelper: ChannelControlHelper;
/**
* ChannelControlHelper for the pendingReplacementLoadBalancer.
*/
private readonly replacementChannelControlHelper: ChannelControlHelper;
/**
* The backoff timer for handling name resolution failures.
@ -127,11 +87,28 @@ export class ResolvingLoadBalancer implements LoadBalancer {
* implmentation
*/
constructor(
private target: GrpcUri,
private channelControlHelper: ChannelControlHelper,
private defaultServiceConfig: ServiceConfig | null
private readonly target: GrpcUri,
private readonly channelControlHelper: ChannelControlHelper,
private readonly defaultServiceConfig: ServiceConfig | null
) {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.childLoadBalancer = new ChildLoadBalancerHandler({
createSubchannel: channelControlHelper.createSubchannel.bind(channelControlHelper),
requestReresolution: () => {
/* If the backoffTimeout is running, we're still backing off from
* making resolve requests, so we shouldn't make another one here.
* In that case, the backoff timer callback will call
* updateResolution */
if (this.backoffTimeout.isRunning()) {
this.continueResolving = true;
} else {
this.updateResolution();
}
},
updateState: (newState: ConnectivityState, picker: Picker) => {
this.updateState(newState, picker);
}
});
this.innerResolver = createResolver(target, {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
@ -171,227 +148,64 @@ export class ResolvingLoadBalancer implements LoadBalancer {
workingServiceConfig = serviceConfig;
this.previousServiceConfig = serviceConfig;
}
let loadBalancerName: string | null = null;
let loadBalancingConfig: LoadBalancingConfig | null = null;
if (
workingServiceConfig === null ||
workingServiceConfig.loadBalancingConfig.length === 0
) {
loadBalancerName = DEFAULT_LOAD_BALANCER_NAME;
} else {
for (const lbConfig of workingServiceConfig.loadBalancingConfig) {
// Iterating through a oneof looking for whichever one is populated
for (const key in lbConfig) {
if (Object.prototype.hasOwnProperty.call(lbConfig, key)) {
if (isLoadBalancerNameRegistered(key)) {
loadBalancerName = key;
loadBalancingConfig = lbConfig;
break;
}
}
}
if (loadBalancerName !== null) {
break;
}
}
if (loadBalancerName === null) {
// There were load balancing configs but none are supported. This counts as a resolution failure
this.handleResolutionFailure({
code: Status.UNAVAILABLE,
details:
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
return;
}
const workingConfigList = workingServiceConfig?.loadBalancingConfig ?? [];
if (workingConfigList.length === 0) {
workingConfigList.push({
name: 'pick_first',
pick_first: {}
});
}
if (this.innerLoadBalancer === null) {
this.innerLoadBalancer = createLoadBalancer(
loadBalancerName,
this.innerChannelControlHelper
)!;
this.innerLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig,
attributes
);
} else if (this.innerLoadBalancer.getTypeName() === loadBalancerName) {
this.innerLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig,
attributes
);
} else {
if (
this.pendingReplacementLoadBalancer === null ||
this.pendingReplacementLoadBalancer.getTypeName() !==
loadBalancerName
) {
if (this.pendingReplacementLoadBalancer !== null) {
this.pendingReplacementLoadBalancer.destroy();
}
this.pendingReplacementLoadBalancer = createLoadBalancer(
loadBalancerName,
this.replacementChannelControlHelper
)!;
}
this.pendingReplacementLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig,
attributes
);
const loadBalancingConfig = getFirstUsableConfig(workingConfigList);
if (loadBalancingConfig === null) {
// There were load balancing configs but none are supported. This counts as a resolution failure
this.handleResolutionFailure({
code: Status.UNAVAILABLE,
details:
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
return;
}
this.childLoadBalancer.updateAddressList(addressList, loadBalancingConfig, attributes);
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);
},
});
this.innerChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions
) => {
return this.channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs
);
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.innerBalancerState = connectivityState;
if (connectivityState === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
}
this.innerBalancerPicker = picker;
if (
connectivityState !== ConnectivityState.READY &&
this.pendingReplacementLoadBalancer !== null
) {
this.switchOverReplacementBalancer();
} else {
if (connectivityState === ConnectivityState.IDLE) {
if (this.innerLoadBalancer) {
this.innerLoadBalancer.destroy();
this.innerLoadBalancer = null;
}
}
this.updateState(connectivityState, picker);
}
},
requestReresolution: () => {
if (this.pendingReplacementLoadBalancer === null) {
/* If the backoffTimeout is running, we're still backing off from
* making resolve requests, so we shouldn't make another one here.
* In that case, the backoff timer callback will call
* updateResolution */
if (this.backoffTimeout.isRunning()) {
this.continueResolving = true;
} else {
this.updateResolution();
}
}
},
};
this.replacementChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions
) => {
return this.channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs
);
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
if (connectivityState === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
}
this.replacementBalancerState = connectivityState;
this.replacementBalancerPicker = picker;
if (connectivityState === ConnectivityState.READY) {
this.switchOverReplacementBalancer();
} else if (connectivityState === ConnectivityState.IDLE) {
if (this.pendingReplacementLoadBalancer) {
this.pendingReplacementLoadBalancer.destroy();
this.pendingReplacementLoadBalancer = null;
}
}
},
requestReresolution: () => {
/* If the backoffTimeout is running, we're still backing off from
* making resolve requests, so we shouldn't make another one here.
* In that case, the backoff timer callback will call
* updateResolution */
if (this.backoffTimeout.isRunning()) {
this.continueResolving = true;
} else {
this.updateResolution();
}
},
};
this.backoffTimeout = new BackoffTimeout(() => {
if (this.continueResolving) {
this.updateResolution();
this.continueResolving = false;
} else {
if (this.innerLoadBalancer === null) {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
} else {
this.updateState(this.innerBalancerState, this.innerBalancerPicker);
}
}
});
}
private updateResolution() {
this.innerResolver.updateResolution();
if (
this.innerLoadBalancer === null ||
this.innerBalancerState === ConnectivityState.IDLE
) {
if (this.currentState === ConnectivityState.IDLE) {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
}
}
private updateState(connectivitystate: ConnectivityState, picker: Picker) {
private updateState(connectivityState: ConnectivityState, picker: Picker) {
trace(
uriToString(this.target) +
' ' +
ConnectivityState[this.currentState] +
' -> ' +
ConnectivityState[connectivitystate]
);
this.currentState = connectivitystate;
this.channelControlHelper.updateState(connectivitystate, picker);
}
/**
* Stop using the current innerLoadBalancer and replace it with the
* pendingReplacementLoadBalancer. Must only be called if both of
* those are currently not null.
*/
private switchOverReplacementBalancer() {
this.innerLoadBalancer!.destroy();
this.innerLoadBalancer = this.pendingReplacementLoadBalancer!;
this.innerLoadBalancer.replaceChannelControlHelper(
this.innerChannelControlHelper
);
this.pendingReplacementLoadBalancer = null;
this.innerBalancerState = this.replacementBalancerState;
this.innerBalancerPicker = this.replacementBalancerPicker;
this.updateState(
this.replacementBalancerState,
this.replacementBalancerPicker
ConnectivityState[connectivityState]
);
// Ensure that this.exitIdle() is called by the picker
if (connectivityState === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
}
this.currentState = connectivityState;
this.channelControlHelper.updateState(connectivityState, picker);
}
private handleResolutionFailure(error: StatusObject) {
if (
this.innerLoadBalancer === null ||
this.innerBalancerState === ConnectivityState.IDLE
) {
if (this.currentState === ConnectivityState.IDLE) {
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker(error)
@ -401,9 +215,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
}
exitIdle() {
if (this.innerLoadBalancer !== null) {
this.innerLoadBalancer.exitIdle();
}
this.childLoadBalancer.exitIdle();
if (this.currentState === ConnectivityState.IDLE) {
if (this.backoffTimeout.isRunning()) {
this.continueResolving = true;
@ -423,31 +235,15 @@ export class ResolvingLoadBalancer implements LoadBalancer {
resetBackoff() {
this.backoffTimeout.reset();
if (this.innerLoadBalancer !== null) {
this.innerLoadBalancer.resetBackoff();
}
if (this.pendingReplacementLoadBalancer !== null) {
this.pendingReplacementLoadBalancer.resetBackoff();
}
this.childLoadBalancer.resetBackoff();
}
destroy() {
if (this.innerLoadBalancer !== null) {
this.innerLoadBalancer.destroy();
this.innerLoadBalancer = null;
}
if (this.pendingReplacementLoadBalancer !== null) {
this.pendingReplacementLoadBalancer.destroy();
this.pendingReplacementLoadBalancer = null;
}
this.childLoadBalancer.destroy();
this.updateState(ConnectivityState.SHUTDOWN, new UnavailablePicker());
}
getTypeName() {
return 'resolving_load_balancer';
}
replaceChannelControlHelper(channelControlHelper: ChannelControlHelper) {
this.channelControlHelper = channelControlHelper;
}
}