Merge pull request #2566 from murgatroid99/grpc-js_load_balancer_channel_args

grpc-js: Pass channel options to LoadBalancer constructors
This commit is contained in:
Michael Lumish 2023-09-06 11:28:22 -07:00 committed by GitHub
commit 71d8118cc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 110 additions and 78 deletions

View File

@ -88,7 +88,7 @@ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}});
class RpcBehaviorLoadBalancer implements LoadBalancer { class RpcBehaviorLoadBalancer implements LoadBalancer {
private child: ChildLoadBalancerHandler; private child: ChildLoadBalancerHandler;
private latestConfig: RpcBehaviorLoadBalancingConfig | null = null; private latestConfig: RpcBehaviorLoadBalancingConfig | null = null;
constructor(channelControlHelper: ChannelControlHelper) { constructor(channelControlHelper: ChannelControlHelper, options: grpc.ChannelOptions) {
const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, { const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, {
updateState: (connectivityState, picker) => { updateState: (connectivityState, picker) => {
if (connectivityState === grpc.connectivityState.READY && this.latestConfig) { if (connectivityState === grpc.connectivityState.READY && this.latestConfig) {
@ -97,7 +97,7 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker); channelControlHelper.updateState(connectivityState, picker);
} }
}); });
this.child = new ChildLoadBalancerHandler(childChannelControlHelper); this.child = new ChildLoadBalancerHandler(childChannelControlHelper, options);
} }
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) { if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {

View File

@ -15,7 +15,7 @@
* *
*/ */
import { connectivityState, status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from '@grpc/grpc-js'; import { connectivityState, status, Metadata, logVerbosity, experimental, LoadBalancingConfig, ChannelOptions } from '@grpc/grpc-js';
import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client'; import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster'; import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
import Endpoint = experimental.Endpoint; import Endpoint = experimental.Endpoint;
@ -155,8 +155,8 @@ export class CdsLoadBalancer implements LoadBalancer {
private updatedChild = false; private updatedChild = false;
constructor(private readonly channelControlHelper: ChannelControlHelper) { constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
this.childBalancer = new XdsClusterResolverChildPolicyHandler(channelControlHelper); this.childBalancer = new XdsClusterResolverChildPolicyHandler(channelControlHelper, options);
} }
private reportError(errorMessage: string) { private reportError(errorMessage: string) {

View File

@ -15,7 +15,7 @@
* *
*/ */
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental, LoadBalancingConfig } from '@grpc/grpc-js'; import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental, LoadBalancingConfig, ChannelOptions } from '@grpc/grpc-js';
import LoadBalancer = experimental.LoadBalancer; import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper; import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType; import registerLoadBalancerType = experimental.registerLoadBalancerType;
@ -180,7 +180,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
this.parent.channelControlHelper.requestReresolution(); this.parent.channelControlHelper.requestReresolution();
} }
} }
})); }), parent.options);
this.picker = new QueuePicker(this.childBalancer); this.picker = new QueuePicker(this.childBalancer);
this.startFailoverTimer(); this.startFailoverTimer();
} }
@ -306,7 +306,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
private updatesPaused = false; private updatesPaused = false;
constructor(private channelControlHelper: ChannelControlHelper) {} constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
private updateState(state: ConnectivityState, picker: Picker) { private updateState(state: ConnectivityState, picker: Picker) {
trace( trace(

View File

@ -15,7 +15,7 @@
* *
*/ */
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from "@grpc/grpc-js"; import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig, ChannelOptions } from "@grpc/grpc-js";
import { isLocalityEndpoint, LocalityEndpoint } from "./load-balancer-priority"; import { isLocalityEndpoint, LocalityEndpoint } from "./load-balancer-priority";
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import LoadBalancer = experimental.LoadBalancer; import LoadBalancer = experimental.LoadBalancer;
@ -178,7 +178,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
updateState: (connectivityState: ConnectivityState, picker: Picker) => { updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker); this.updateState(connectivityState, picker);
}, },
})); }), parent.options);
this.picker = new QueuePicker(this.childBalancer); this.picker = new QueuePicker(this.childBalancer);
} }
@ -243,7 +243,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
private targetList: string[] = []; private targetList: string[] = [];
private updatesPaused = false; private updatesPaused = false;
constructor(private channelControlHelper: ChannelControlHelper) {} constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
private maybeUpdateState() { private maybeUpdateState() {
if (!this.updatesPaused) { if (!this.updatesPaused) {

View File

@ -15,7 +15,7 @@
* *
*/ */
import { experimental, logVerbosity, status as Status, Metadata, connectivityState } from "@grpc/grpc-js"; import { experimental, logVerbosity, status as Status, Metadata, connectivityState, ChannelOptions } from "@grpc/grpc-js";
import { validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap"; import { validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap";
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats, XdsClusterLocalityStats } from "./xds-client"; import { getSingletonXdsClient, XdsClient, XdsClusterDropStats, XdsClusterLocalityStats } from "./xds-client";
import { LocalityEndpoint } from "./load-balancer-priority"; import { LocalityEndpoint } from "./load-balancer-priority";
@ -253,7 +253,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
private clusterDropStats: XdsClusterDropStats | null = null; private clusterDropStats: XdsClusterDropStats | null = null;
private xdsClient: XdsClient | null = null; private xdsClient: XdsClient | null = null;
constructor(private readonly channelControlHelper: ChannelControlHelper) { constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, { this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress, subchannelArgs) => { createSubchannel: (subchannelAddress, subchannelArgs) => {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList) { if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList) {
@ -290,7 +290,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker); channelControlHelper.updateState(connectivityState, picker);
} }
} }
})); }), options);
} }
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) { if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) {

View File

@ -15,7 +15,7 @@
* *
*/ */
import { connectivityState as ConnectivityState, status as Status, experimental, logVerbosity, Metadata, status } from "@grpc/grpc-js/"; import { connectivityState as ConnectivityState, status as Status, experimental, logVerbosity, Metadata, status, ChannelOptions } from "@grpc/grpc-js/";
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import LoadBalancer = experimental.LoadBalancer; import LoadBalancer = experimental.LoadBalancer;
@ -131,7 +131,7 @@ class XdsClusterManager implements LoadBalancer {
updateState: (connectivityState: ConnectivityState, picker: Picker) => { updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker); this.updateState(connectivityState, picker);
}, },
})); }), parent.options);
this.picker = new QueuePicker(this.childBalancer); this.picker = new QueuePicker(this.childBalancer);
} }
@ -167,7 +167,7 @@ class XdsClusterManager implements LoadBalancer {
// Shutdown is a placeholder value that will never appear in normal operation. // Shutdown is a placeholder value that will never appear in normal operation.
private currentState: ConnectivityState = ConnectivityState.SHUTDOWN; private currentState: ConnectivityState = ConnectivityState.SHUTDOWN;
private updatesPaused = false; private updatesPaused = false;
constructor(private channelControlHelper: ChannelControlHelper) {} constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
private maybeUpdateState() { private maybeUpdateState() {
if (!this.updatesPaused) { if (!this.updatesPaused) {

View File

@ -15,7 +15,7 @@
* *
*/ */
import { LoadBalancingConfig, Metadata, connectivityState, experimental, logVerbosity, status } from "@grpc/grpc-js"; import { ChannelOptions, LoadBalancingConfig, Metadata, connectivityState, experimental, logVerbosity, status } from "@grpc/grpc-js";
import { registerLoadBalancerType } from "@grpc/grpc-js/build/src/load-balancer"; import { registerLoadBalancerType } from "@grpc/grpc-js/build/src/load-balancer";
import { EXPERIMENTAL_OUTLIER_DETECTION } from "./environment"; import { EXPERIMENTAL_OUTLIER_DETECTION } from "./environment";
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality"; import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
@ -232,14 +232,14 @@ export class XdsClusterResolver implements LoadBalancer {
private xdsClient: XdsClient | null = null; private xdsClient: XdsClient | null = null;
private childBalancer: ChildLoadBalancerHandler; private childBalancer: ChildLoadBalancerHandler;
constructor(private readonly channelControlHelper: ChannelControlHelper) { constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
this.childBalancer = new ChildLoadBalancerHandler(experimental.createChildChannelControlHelper(channelControlHelper, { this.childBalancer = new ChildLoadBalancerHandler(experimental.createChildChannelControlHelper(channelControlHelper, {
requestReresolution: () => { requestReresolution: () => {
for (const entry of this.discoveryMechanismList) { for (const entry of this.discoveryMechanismList) {
entry.resolver?.updateResolution(); entry.resolver?.updateResolution();
} }
} }
})); }), options);
} }
private maybeUpdateChild() { private maybeUpdateChild() {

View File

@ -17,7 +17,7 @@
// https://github.com/grpc/proposal/blob/master/A52-xds-custom-lb-policies.md // https://github.com/grpc/proposal/blob/master/A52-xds-custom-lb-policies.md
import { LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js"; import { ChannelOptions, LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js";
import { loadProtosWithOptionsSync } from "@grpc/proto-loader/build/src/util"; import { loadProtosWithOptionsSync } from "@grpc/proto-loader/build/src/util";
import { WeightedTargetRaw } from "./load-balancer-weighted-target"; import { WeightedTargetRaw } from "./load-balancer-weighted-target";
import { isLocalityEndpoint } from "./load-balancer-priority"; import { isLocalityEndpoint } from "./load-balancer-priority";
@ -73,8 +73,8 @@ class XdsWrrLocalityLoadBalancingConfig implements TypedLoadBalancingConfig {
class XdsWrrLocalityLoadBalancer implements LoadBalancer { class XdsWrrLocalityLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler; private childBalancer: ChildLoadBalancerHandler;
constructor(private readonly channelControlHelper: ChannelControlHelper) { constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper); this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper, options);
} }
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof XdsWrrLocalityLoadBalancingConfig)) { if (!(lbConfig instanceof XdsWrrLocalityLoadBalancingConfig)) {

View File

@ -24,7 +24,7 @@ import { XdsServer } from "./xds-server";
import * as assert from 'assert'; import * as assert from 'assert';
import { WrrLocality } from "../src/generated/envoy/extensions/load_balancing_policies/wrr_locality/v3/WrrLocality"; import { WrrLocality } from "../src/generated/envoy/extensions/load_balancing_policies/wrr_locality/v3/WrrLocality";
import { TypedStruct } from "../src/generated/xds/type/v3/TypedStruct"; import { TypedStruct } from "../src/generated/xds/type/v3/TypedStruct";
import { connectivityState, experimental, logVerbosity } from "@grpc/grpc-js"; import { ChannelOptions, connectivityState, experimental, logVerbosity } from "@grpc/grpc-js";
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig; import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import LoadBalancer = experimental.LoadBalancer; import LoadBalancer = experimental.LoadBalancer;
@ -83,7 +83,7 @@ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}});
class RpcBehaviorLoadBalancer implements LoadBalancer { class RpcBehaviorLoadBalancer implements LoadBalancer {
private child: ChildLoadBalancerHandler; private child: ChildLoadBalancerHandler;
private latestConfig: RpcBehaviorLoadBalancingConfig | null = null; private latestConfig: RpcBehaviorLoadBalancingConfig | null = null;
constructor(channelControlHelper: ChannelControlHelper) { constructor(channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, { const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, {
updateState: (state, picker) => { updateState: (state, picker) => {
if (state === connectivityState.READY && this.latestConfig) { if (state === connectivityState.READY && this.latestConfig) {
@ -92,7 +92,7 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
channelControlHelper.updateState(state, picker); channelControlHelper.updateState(state, picker);
} }
}); });
this.child = new ChildLoadBalancerHandler(childChannelControlHelper); this.child = new ChildLoadBalancerHandler(childChannelControlHelper, options);
} }
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void { updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) { if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {

View File

@ -84,7 +84,10 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
} }
}; };
constructor(private readonly channelControlHelper: ChannelControlHelper) {} constructor(
private readonly channelControlHelper: ChannelControlHelper,
private readonly options: ChannelOptions
) {}
protected configUpdateRequiresNewPolicyInstance( protected configUpdateRequiresNewPolicyInstance(
oldConfig: TypedLoadBalancingConfig, oldConfig: TypedLoadBalancingConfig,
@ -111,7 +114,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
this.configUpdateRequiresNewPolicyInstance(this.latestConfig, lbConfig) this.configUpdateRequiresNewPolicyInstance(this.latestConfig, lbConfig)
) { ) {
const newHelper = new this.ChildPolicyHelper(this); const newHelper = new this.ChildPolicyHelper(this);
const newChild = createLoadBalancer(lbConfig, newHelper)!; const newChild = createLoadBalancer(lbConfig, newHelper, this.options)!;
newHelper.setChild(newChild); newHelper.setChild(newChild);
if (this.currentChild === null) { if (this.currentChild === null) {
this.currentChild = newChild; this.currentChild = newChild;

View File

@ -585,7 +585,10 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
private ejectionTimer: NodeJS.Timeout; private ejectionTimer: NodeJS.Timeout;
private timerStartTime: Date | null = null; private timerStartTime: Date | null = null;
constructor(channelControlHelper: ChannelControlHelper) { constructor(
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
) {
this.childBalancer = new ChildLoadBalancerHandler( this.childBalancer = new ChildLoadBalancerHandler(
createChildChannelControlHelper(channelControlHelper, { createChildChannelControlHelper(channelControlHelper, {
createSubchannel: ( createSubchannel: (
@ -619,7 +622,8 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker); channelControlHelper.updateState(connectivityState, picker);
} }
}, },
}) }),
options
); );
this.ejectionTimer = setInterval(() => {}, 0); this.ejectionTimer = setInterval(() => {}, 0);
clearInterval(this.ejectionTimer); clearInterval(this.ejectionTimer);

View File

@ -42,6 +42,7 @@ import {
} from './subchannel-interface'; } from './subchannel-interface';
import { isTcpSubchannelAddress } from './subchannel-address'; import { isTcpSubchannelAddress } from './subchannel-address';
import { isIPv6 } from 'net'; import { isIPv6 } from 'net';
import { ChannelOptions } from './channel-options';
const TRACER_NAME = 'pick_first'; const TRACER_NAME = 'pick_first';
@ -162,6 +163,9 @@ function interleaveAddressFamilies(
return result; return result;
} }
const REPORT_HEALTH_STATUS_OPTION_NAME =
'grpc-node.internal.pick-first.report_health_status';
export class PickFirstLoadBalancer implements LoadBalancer { export class PickFirstLoadBalancer implements LoadBalancer {
/** /**
* The list of subchannels this load balancer is currently attempting to * The list of subchannels this load balancer is currently attempting to
@ -212,6 +216,8 @@ export class PickFirstLoadBalancer implements LoadBalancer {
*/ */
private stickyTransientFailureMode = false; private stickyTransientFailureMode = false;
private reportHealthStatus: boolean;
/** /**
* Load balancer that attempts to connect to each backend in the address list * Load balancer that attempts to connect to each backend in the address list
* in order, and picks the first one that connects, using it for every * in order, and picks the first one that connects, using it for every
@ -221,10 +227,11 @@ export class PickFirstLoadBalancer implements LoadBalancer {
*/ */
constructor( constructor(
private readonly channelControlHelper: ChannelControlHelper, private readonly channelControlHelper: ChannelControlHelper,
private reportHealthStatus = false options: ChannelOptions
) { ) {
this.connectionDelayTimeout = setTimeout(() => {}, 0); this.connectionDelayTimeout = setTimeout(() => {}, 0);
clearTimeout(this.connectionDelayTimeout); clearTimeout(this.connectionDelayTimeout);
this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME];
} }
private allChildrenHaveReportedTF(): boolean { private allChildrenHaveReportedTF(): boolean {
@ -510,7 +517,8 @@ export class LeafLoadBalancer {
private latestPicker: Picker; private latestPicker: Picker;
constructor( constructor(
private endpoint: Endpoint, private endpoint: Endpoint,
channelControlHelper: ChannelControlHelper channelControlHelper: ChannelControlHelper,
options: ChannelOptions
) { ) {
const childChannelControlHelper = createChildChannelControlHelper( const childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper, channelControlHelper,
@ -524,7 +532,7 @@ export class LeafLoadBalancer {
); );
this.pickFirstBalancer = new PickFirstLoadBalancer( this.pickFirstBalancer = new PickFirstLoadBalancer(
childChannelControlHelper, childChannelControlHelper,
/* reportHealthStatus= */ true { ...options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true }
); );
this.latestPicker = new QueuePicker(this.pickFirstBalancer); this.latestPicker = new QueuePicker(this.pickFirstBalancer);
} }

View File

@ -38,6 +38,7 @@ import {
endpointToString, endpointToString,
} from './subchannel-address'; } from './subchannel-address';
import { LeafLoadBalancer } from './load-balancer-pick-first'; import { LeafLoadBalancer } from './load-balancer-pick-first';
import { ChannelOptions } from './channel-options';
const TRACER_NAME = 'round_robin'; const TRACER_NAME = 'round_robin';
@ -99,7 +100,10 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
private childChannelControlHelper: ChannelControlHelper; private childChannelControlHelper: ChannelControlHelper;
constructor(private readonly channelControlHelper: ChannelControlHelper) { constructor(
private readonly channelControlHelper: ChannelControlHelper,
private readonly options: ChannelOptions
) {
this.childChannelControlHelper = createChildChannelControlHelper( this.childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper, channelControlHelper,
{ {
@ -186,7 +190,12 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
trace('Connect to endpoint list ' + endpointList.map(endpointToString)); trace('Connect to endpoint list ' + endpointList.map(endpointToString));
this.updatesPaused = true; this.updatesPaused = true;
this.children = endpointList.map( this.children = endpointList.map(
endpoint => new LeafLoadBalancer(endpoint, this.childChannelControlHelper) endpoint =>
new LeafLoadBalancer(
endpoint,
this.childChannelControlHelper,
this.options
)
); );
for (const child of this.children) { for (const child of this.children) {
child.startConnecting(); child.startConnecting();

View File

@ -128,7 +128,10 @@ export interface LoadBalancer {
} }
export interface LoadBalancerConstructor { export interface LoadBalancerConstructor {
new (channelControlHelper: ChannelControlHelper): LoadBalancer; new (
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
): LoadBalancer;
} }
export interface TypedLoadBalancingConfig { export interface TypedLoadBalancingConfig {
@ -169,12 +172,14 @@ export function registerDefaultLoadBalancerType(typeName: string) {
export function createLoadBalancer( export function createLoadBalancer(
config: TypedLoadBalancingConfig, config: TypedLoadBalancingConfig,
channelControlHelper: ChannelControlHelper channelControlHelper: ChannelControlHelper,
options: ChannelOptions
): LoadBalancer | null { ): LoadBalancer | null {
const typeName = config.getLoadBalancerName(); const typeName = config.getLoadBalancerName();
if (typeName in registeredLoadBalancerTypes) { if (typeName in registeredLoadBalancerTypes) {
return new registeredLoadBalancerTypes[typeName].LoadBalancer( return new registeredLoadBalancerTypes[typeName].LoadBalancer(
channelControlHelper channelControlHelper,
options
); );
} else { } else {
return null; return null;

View File

@ -149,30 +149,33 @@ export class ResolvingLoadBalancer implements LoadBalancer {
}; };
} }
this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.childLoadBalancer = new ChildLoadBalancerHandler({ this.childLoadBalancer = new ChildLoadBalancerHandler(
createSubchannel: {
channelControlHelper.createSubchannel.bind(channelControlHelper), createSubchannel:
requestReresolution: () => { channelControlHelper.createSubchannel.bind(channelControlHelper),
/* If the backoffTimeout is running, we're still backing off from requestReresolution: () => {
* making resolve requests, so we shouldn't make another one here. /* If the backoffTimeout is running, we're still backing off from
* In that case, the backoff timer callback will call * making resolve requests, so we shouldn't make another one here.
* updateResolution */ * In that case, the backoff timer callback will call
if (this.backoffTimeout.isRunning()) { * updateResolution */
this.continueResolving = true; if (this.backoffTimeout.isRunning()) {
} else { this.continueResolving = true;
this.updateResolution(); } else {
} this.updateResolution();
}
},
updateState: (newState: ConnectivityState, picker: Picker) => {
this.latestChildState = newState;
this.latestChildPicker = picker;
this.updateState(newState, picker);
},
addChannelzChild:
channelControlHelper.addChannelzChild.bind(channelControlHelper),
removeChannelzChild:
channelControlHelper.removeChannelzChild.bind(channelControlHelper),
}, },
updateState: (newState: ConnectivityState, picker: Picker) => { channelOptions
this.latestChildState = newState; );
this.latestChildPicker = picker;
this.updateState(newState, picker);
},
addChannelzChild:
channelControlHelper.addChannelzChild.bind(channelControlHelper),
removeChannelzChild:
channelControlHelper.removeChannelzChild.bind(channelControlHelper),
});
this.innerResolver = createResolver( this.innerResolver = createResolver(
target, target,
{ {

View File

@ -116,7 +116,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }], [{ addresses: [{ host: 'localhost', port: 1 }] }],
config config
@ -135,7 +135,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[ [
{ addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 1 }] },
@ -157,7 +157,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[ [
{ {
@ -191,7 +191,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }], [{ addresses: [{ host: 'localhost', port: 1 }] }],
config config
@ -207,7 +207,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[ [
{ addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 1 }] },
@ -229,7 +229,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[ [
{ addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 1 }] },
@ -254,7 +254,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[ [
{ addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 1 }] },
@ -293,7 +293,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[ [
{ addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 1 }] },
@ -320,7 +320,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[ [
{ addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 1 }] },
@ -351,7 +351,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[ [
{ addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 1 }] },
@ -389,7 +389,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[ [
{ addresses: [{ host: 'localhost', port: 1 }] }, { addresses: [{ host: 'localhost', port: 1 }] },
@ -424,7 +424,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }], [{ addresses: [{ host: 'localhost', port: 1 }] }],
config config
@ -452,7 +452,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }], [{ addresses: [{ host: 'localhost', port: 1 }] }],
config config
@ -487,7 +487,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }], [{ addresses: [{ host: 'localhost', port: 1 }] }],
config config
@ -522,7 +522,7 @@ describe('pick_first load balancing policy', () => {
), ),
} }
); );
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList( pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }], [{ addresses: [{ host: 'localhost', port: 1 }] }],
config config
@ -569,7 +569,7 @@ describe('pick_first load balancing policy', () => {
for (let i = 0; i < 10; i++) { for (let i = 0; i < 10; i++) {
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] }); endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
} }
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
/* Pick from 10 subchannels 5 times, with address randomization enabled, /* Pick from 10 subchannels 5 times, with address randomization enabled,
* and verify that at least two different subchannels are picked. The * and verify that at least two different subchannels are picked. The
* probability choosing the same address every time is 1/10,000, which * probability choosing the same address every time is 1/10,000, which
@ -625,7 +625,7 @@ describe('pick_first load balancing policy', () => {
for (let i = 0; i < 10; i++) { for (let i = 0; i < 10; i++) {
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] }); endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
} }
const pickFirst = new PickFirstLoadBalancer(channelControlHelper); const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList(endpoints, config); pickFirst.updateAddressList(endpoints, config);
process.nextTick(() => { process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config); pickFirst.updateAddressList(endpoints, config);