Merge pull request #2854 from murgatroid99/grpc-js_lb_child_helper_channel_args

grpc-js: Pass channel args to LB policies with updates
This commit is contained in:
Michael Lumish 2024-11-20 13:56:06 -08:00 committed by GitHub
commit f44b5e50e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 212 additions and 161 deletions

View File

@ -41,6 +41,7 @@ import PickResult = grpc.experimental.PickResult;
import PickResultType = grpc.experimental.PickResultType;
import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper;
import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig;
import { ChannelOptions } from '@grpc/grpc-js';
grpc_xds.register();
@ -88,7 +89,7 @@ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}});
class RpcBehaviorLoadBalancer implements LoadBalancer {
private child: ChildLoadBalancerHandler;
private latestConfig: RpcBehaviorLoadBalancingConfig | null = null;
constructor(channelControlHelper: ChannelControlHelper, options: grpc.ChannelOptions) {
constructor(channelControlHelper: ChannelControlHelper) {
const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, {
updateState: (connectivityState, picker) => {
if (connectivityState === grpc.connectivityState.READY && this.latestConfig) {
@ -97,14 +98,14 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker);
}
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper, options);
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options);
}
exitIdle(): void {
this.child.exitIdle();

View File

@ -30,6 +30,7 @@ import { XdsConfig } from './xds-dependency-manager';
import { LocalityEndpoint, PriorityChildRaw } from './load-balancer-priority';
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
import { AGGREGATE_CLUSTER_BACKWARDS_COMPAT, EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
import { XDS_CONFIG_KEY } from './resolver-xds';
const TRACER_NAME = 'cds_balancer';
@ -91,6 +92,8 @@ export function localityToName(locality: Locality__Output) {
return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`;
}
export const ROOT_CLUSTER_KEY = 'grpc.internal.root_cluster';
export class CdsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
@ -99,8 +102,8 @@ export class CdsLoadBalancer implements LoadBalancer {
private priorityNames: string[] = [];
private nextPriorityChildNumber = 0;
constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper, options);
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
}
private getNextPriorityName(cluster: string) {
@ -110,14 +113,14 @@ export class CdsLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof CdsLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
const xdsConfig = attributes.xdsConfig as XdsConfig;
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
const clusterName = lbConfig.getCluster();
const maybeClusterConfig = xdsConfig.clusters.get(clusterName);
if (!maybeClusterConfig) {
@ -165,7 +168,7 @@ export class CdsLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `LB policy config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()}));
return;
}
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...attributes, rootCluster: clusterName});
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName});
} else {
if (!clusterConfig.children.endpoints) {
trace('Received update with no resolved endpoints for cluster ' + clusterName);
@ -180,8 +183,8 @@ export class CdsLoadBalancer implements LoadBalancer {
if (clusterConfig.cluster.type === 'EDS') {
endpointPickingPolicy = clusterConfig.cluster.lbPolicyConfig;
if (AGGREGATE_CLUSTER_BACKWARDS_COMPAT) {
if (typeof attributes.rootCluster === 'string') {
const maybeRootClusterConfig = xdsConfig.clusters.get(attributes.rootCluster);
if (typeof options[ROOT_CLUSTER_KEY] === 'string') {
const maybeRootClusterConfig = xdsConfig.clusters.get(options[ROOT_CLUSTER_KEY]);
if (maybeRootClusterConfig?.success) {
endpointPickingPolicy = maybeRootClusterConfig.value.cluster.lbPolicyConfig;
}
@ -279,7 +282,7 @@ export class CdsLoadBalancer implements LoadBalancer {
return;
}
trace(JSON.stringify(typedChildConfig.toJsonObject(), undefined, 2));
this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, attributes);
this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, options);
}
}
exitIdle(): void {

View File

@ -197,7 +197,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
this.parent.channelControlHelper.requestReresolution();
}
}
}), parent.options);
}));
this.picker = new QueuePicker(this.childBalancer);
this.startFailoverTimer();
}
@ -307,7 +307,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
* The attributes object from the latest update, saved to be passed along to
* each new child as they are created
*/
private latestAttributes: { [key: string]: unknown } = {};
private latestOptions: ChannelOptions = {};
/**
* The latest load balancing policies and address lists for each child from
* the latest update
@ -323,7 +323,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
private updatesPaused = false;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
constructor(private channelControlHelper: ChannelControlHelper) {}
private updateState(state: ConnectivityState, picker: Picker) {
trace(
@ -392,7 +392,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
child.updateAddressList(
childUpdate.subchannelAddress,
childUpdate.lbConfig,
this.latestAttributes
this.latestOptions
);
} else {
/* We're going to try to use this child, so reactivate it if it has been
@ -431,7 +431,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof PriorityLoadBalancingConfig)) {
// Reject a config of the wrong type
@ -467,7 +467,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
childAddressList.push(childAddress);
}
this.latestAttributes = attributes;
this.latestOptions = options;
this.latestUpdates.clear();
this.priorities = lbConfig.getPriorities();
this.updatesPaused = true;
@ -486,7 +486,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
existingChild.updateAddressList(
childAddresses,
childConfig.config,
attributes
options
);
}
}

View File

@ -225,8 +225,7 @@ class RingHashLoadBalancer implements LoadBalancer {
private updatesPaused = false;
private currentState: connectivityState = connectivityState.IDLE;
private ring: RingEntry[] = [];
private ringHashSizeCap = DEFAULT_RING_SIZE_CAP;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {
constructor(private channelControlHelper: ChannelControlHelper) {
this.childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
{
@ -254,9 +253,6 @@ class RingHashLoadBalancer implements LoadBalancer {
},
}
);
if (options['grpc.lb.ring_hash.ring_size_cap'] !== undefined) {
this.ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap'];
}
}
private calculateAndUpdateState() {
@ -316,7 +312,8 @@ class RingHashLoadBalancer implements LoadBalancer {
private constructRing(
endpointList: Endpoint[],
config: RingHashLoadBalancingConfig
config: RingHashLoadBalancingConfig,
ringHashSizeCap: number
) {
this.ring = [];
const endpointWeights: EndpointWeight[] = [];
@ -336,8 +333,8 @@ class RingHashLoadBalancer implements LoadBalancer {
minNormalizedWeight
);
}
const minRingSize = Math.min(config.getMinRingSize(), this.ringHashSizeCap);
const maxRingSize = Math.min(config.getMaxRingSize(), this.ringHashSizeCap);
const minRingSize = Math.min(config.getMinRingSize(), ringHashSizeCap);
const maxRingSize = Math.min(config.getMaxRingSize(), ringHashSizeCap);
/* Calculate a scale factor that meets the following conditions:
* 1. The result is between minRingSize and maxRingSize, inclusive
* 2. The smallest normalized weight is scaled to a whole number, if it
@ -390,7 +387,7 @@ class RingHashLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof RingHashLoadBalancingConfig)) {
trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
@ -403,11 +400,11 @@ class RingHashLoadBalancer implements LoadBalancer {
for (const endpoint of endpointList) {
const leafBalancer = this.leafMap.get(endpoint);
if (leafBalancer) {
leafBalancer.updateEndpoint(endpoint);
leafBalancer.updateEndpoint(endpoint, options);
} else {
this.leafMap.set(
endpoint,
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, this.options)
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options)
);
}
const weight = this.leafWeightMap.get(endpoint);
@ -420,8 +417,9 @@ class RingHashLoadBalancer implements LoadBalancer {
for (const leaf of removedLeaves) {
leaf.destroy();
}
const ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap'] ?? DEFAULT_RING_SIZE_CAP
loadXxhashApi().then(() => {
this.constructRing(dedupedEndpointList, lbConfig);
this.constructRing(dedupedEndpointList, lbConfig, ringHashSizeCap);
this.updatesPaused = false;
this.calculateAndUpdateState();
});

View File

@ -178,7 +178,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker);
},
}), parent.options);
}));
this.picker = new QueuePicker(this.childBalancer);
}
@ -190,9 +190,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.parent.maybeUpdateState();
}
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, options: ChannelOptions): void {
this.weight = lbConfig.weight;
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options);
}
exitIdle(): void {
this.childBalancer.exitIdle();
@ -243,7 +243,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
private targetList: string[] = [];
private updatesPaused = false;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
constructor(private channelControlHelper: ChannelControlHelper) {}
private maybeUpdateState() {
if (!this.updatesPaused) {
@ -319,7 +319,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, picker);
}
updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
@ -365,7 +365,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
}
const targetEndpoints = childEndpointMap.get(targetName) ?? [];
trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'));
target.updateAddressList(targetEndpoints, targetConfig, attributes);
target.updateAddressList(targetEndpoints, targetConfig, options);
}
// Deactivate targets that are not in the new config

View File

@ -40,6 +40,7 @@ import UnavailablePicker = experimental.UnavailablePicker;
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
import { ClusterConfig, XdsConfig } from "./xds-dependency-manager";
import { CdsUpdate } from "./xds-resource-type/cluster-resource-type";
import { XDS_CLIENT_KEY, XDS_CONFIG_KEY } from "./resolver-xds";
const TRACER_NAME = 'xds_cluster_impl';
@ -211,7 +212,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
private xdsClient: XdsClient | null = null;
private latestClusterConfig: ClusterConfig | null = null;
constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress, subchannelArgs) => {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) {
@ -248,15 +249,15 @@ class XdsClusterImplBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker);
}
}
}), options);
}));
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const xdsConfig = attributes.xdsConfig as XdsConfig;
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster());
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + lbConfig.getCluster());
@ -281,7 +282,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
this.lastestEndpointList = endpointList;
this.latestConfig = lbConfig;
this.latestClusterConfig = clusterConfig;
this.xdsClient = attributes.xdsClient as XdsClient;
this.xdsClient = options[XDS_CLIENT_KEY] as XdsClient;
if (clusterConfig.cluster.lrsLoadReportingServer) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
clusterConfig.cluster.lrsLoadReportingServer,
@ -290,7 +291,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
);
}
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options);
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -131,7 +131,7 @@ class XdsClusterManager implements LoadBalancer {
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker);
},
}), parent.options);
}));
this.picker = new QueuePicker(this.childBalancer);
}
@ -142,8 +142,8 @@ class XdsClusterManager implements LoadBalancer {
this.picker = picker;
this.parent.maybeUpdateState();
}
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
this.childBalancer.updateAddressList(endpointList, childConfig, attributes);
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
this.childBalancer.updateAddressList(endpointList, childConfig, options);
}
exitIdle(): void {
this.childBalancer.exitIdle();
@ -167,7 +167,7 @@ class XdsClusterManager implements LoadBalancer {
// Shutdown is a placeholder value that will never appear in normal operation.
private currentState: ConnectivityState = ConnectivityState.SHUTDOWN;
private updatesPaused = false;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
constructor(private channelControlHelper: ChannelControlHelper) {}
private maybeUpdateState() {
if (!this.updatesPaused) {
@ -207,7 +207,7 @@ class XdsClusterManager implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap));
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
@ -234,7 +234,7 @@ class XdsClusterManager implements LoadBalancer {
child = new this.XdsClusterManagerChildImpl(this, name);
this.children.set(name, child);
}
child.updateAddressList(endpointList, childConfig, attributes);
child.updateAddressList(endpointList, childConfig, options);
}
this.updatesPaused = false;
this.updateState();

View File

@ -73,10 +73,10 @@ class XdsWrrLocalityLoadBalancingConfig implements TypedLoadBalancingConfig {
class XdsWrrLocalityLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper, options);
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof XdsWrrLocalityLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
@ -99,7 +99,7 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
targets: targets
}
};
this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), attributes);
this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), options);
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -80,6 +80,9 @@ const RETRY_CODES: {[key: string]: status} = {
'unavailable': status.UNAVAILABLE
};
export const XDS_CONFIG_KEY = `${experimental.SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX}.xds_config`;
export const XDS_CLIENT_KEY = 'grpc.internal.xds_client';
class XdsResolver implements Resolver {
private listenerResourceName: string | null = null;
@ -355,8 +358,8 @@ class XdsResolver implements Resolver {
loadBalancingConfig: [lbPolicyConfig]
}
this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {
xdsClient: this.xdsClient,
xdsConfig: xdsConfig
[XDS_CLIENT_KEY]: this.xdsClient,
[XDS_CONFIG_KEY]: xdsConfig
});
}

View File

@ -1322,6 +1322,15 @@ export class XdsClient {
}
return this.certificateProviderRegistry.get(instanceName);
}
/**
* Returns a valid JSON-stringifiable object, to avoid causing a circular
* reference error when an object containing this object is stringified.
* @returns
*/
toJSON(): object {
return {};
}
}
let singletonXdsClient: XdsClient | null = null;

View File

@ -84,7 +84,7 @@ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}});
class RpcBehaviorLoadBalancer implements LoadBalancer {
private child: ChildLoadBalancerHandler;
private latestConfig: RpcBehaviorLoadBalancingConfig | null = null;
constructor(channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
constructor(channelControlHelper: ChannelControlHelper) {
const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, {
updateState: (state, picker) => {
if (state === connectivityState.READY && this.latestConfig) {
@ -93,14 +93,14 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
channelControlHelper.updateState(state, picker);
}
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper, options);
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options);
}
exitIdle(): void {
this.child.exitIdle();

View File

@ -64,3 +64,4 @@ export {
FileWatcherCertificateProviderConfig
} from './certificate-provider';
export { createCertificateProviderChannelCredentials } from './channel-credentials';
export { SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX } from './internal-channel';

View File

@ -159,6 +159,8 @@ class ShutdownPicker implements Picker {
}
}
export const SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX = 'grpc.internal.no_subchannel';
export class InternalChannel {
private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
private readonly subchannelPool: SubchannelPool;
@ -296,10 +298,16 @@ export class InternalChannel {
subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions
) => {
const finalSubchannelArgs: ChannelOptions = {};
for (const [key, value] of Object.entries(subchannelArgs)) {
if (!key.startsWith(SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX)) {
finalSubchannelArgs[key] = value;
}
}
const subchannel = this.subchannelPool.getOrCreateSubchannel(
this.target,
subchannelAddress,
Object.assign({}, this.options, subchannelArgs),
finalSubchannelArgs,
this.credentials
);
subchannel.throttleKeepalive(this.keepaliveTime);

View File

@ -30,7 +30,7 @@ import { SubchannelInterface } from './subchannel-interface';
const TYPE_NAME = 'child_load_balancer_helper';
export class ChildLoadBalancerHandler implements LoadBalancer {
export class ChildLoadBalancerHandler {
private currentChild: LoadBalancer | null = null;
private pendingChild: LoadBalancer | null = null;
private latestConfig: TypedLoadBalancingConfig | null = null;
@ -85,8 +85,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
};
constructor(
private readonly channelControlHelper: ChannelControlHelper,
private readonly options: ChannelOptions
private readonly channelControlHelper: ChannelControlHelper
) {}
protected configUpdateRequiresNewPolicyInstance(
@ -105,7 +104,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
let childToUpdate: LoadBalancer;
if (
@ -114,7 +113,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
this.configUpdateRequiresNewPolicyInstance(this.latestConfig, lbConfig)
) {
const newHelper = new this.ChildPolicyHelper(this);
const newChild = createLoadBalancer(lbConfig, newHelper, this.options)!;
const newChild = createLoadBalancer(lbConfig, newHelper)!;
newHelper.setChild(newChild);
if (this.currentChild === null) {
this.currentChild = newChild;
@ -134,7 +133,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
}
}
this.latestConfig = lbConfig;
childToUpdate.updateAddressList(endpointList, lbConfig, attributes);
childToUpdate.updateAddressList(endpointList, lbConfig, options);
}
exitIdle(): void {
if (this.currentChild) {

View File

@ -468,8 +468,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
private timerStartTime: Date | null = null;
constructor(
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
channelControlHelper: ChannelControlHelper
) {
this.childBalancer = new ChildLoadBalancerHandler(
createChildChannelControlHelper(channelControlHelper, {
@ -504,8 +503,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker);
}
},
}),
options
})
);
this.ejectionTimer = setInterval(() => {}, 0);
clearInterval(this.ejectionTimer);
@ -760,7 +758,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) {
return;
@ -779,7 +777,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
}
this.entryMap.deleteMissing(endpointList);
const childPolicy = lbConfig.getChildPolicy();
this.childBalancer.updateAddressList(endpointList, childPolicy, attributes);
this.childBalancer.updateAddressList(endpointList, childPolicy, options);
if (
lbConfig.getSuccessRateEjectionConfig() ||

View File

@ -224,7 +224,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
*/
private stickyTransientFailureMode = false;
private reportHealthStatus: boolean;
private reportHealthStatus: boolean = false;
/**
* The most recent error reported by any subchannel as it transitioned to
@ -234,6 +234,8 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private latestAddressList: SubchannelAddress[] | null = null;
private latestOptions: ChannelOptions = {};
/**
* Load balancer that attempts to connect to each backend in the address list
* in order, and picks the first one that connects, using it for every
@ -242,12 +244,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* this load balancer's owner.
*/
constructor(
private readonly channelControlHelper: ChannelControlHelper,
options: ChannelOptions
private readonly channelControlHelper: ChannelControlHelper
) {
this.connectionDelayTimeout = setTimeout(() => {}, 0);
clearTimeout(this.connectionDelayTimeout);
this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME];
}
private allChildrenHaveReportedTF(): boolean {
@ -461,10 +461,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.children = [];
}
private connectToAddressList(addressList: SubchannelAddress[]) {
private connectToAddressList(addressList: SubchannelAddress[], options: ChannelOptions) {
trace('connectToAddressList([' + addressList.map(address => subchannelAddressToString(address)) + '])');
const newChildrenList = addressList.map(address => ({
subchannel: this.channelControlHelper.createSubchannel(address, {}),
subchannel: this.channelControlHelper.createSubchannel(address, options),
hasReportedTransientFailure: false,
}));
for (const { subchannel } of newChildrenList) {
@ -499,11 +499,13 @@ export class PickFirstLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {
return;
}
this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME];
/* Previously, an update would be discarded if it was identical to the
* previous update, to minimize churn. Now the DNS resolver is
* rate-limited, so that is less of a concern. */
@ -519,7 +521,8 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
const addressList = interleaveAddressFamilies(rawAddressList);
this.latestAddressList = addressList;
this.connectToAddressList(addressList);
this.latestOptions = options;
this.connectToAddressList(addressList, options);
}
exitIdle() {
@ -527,7 +530,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentState === ConnectivityState.IDLE &&
this.latestAddressList
) {
this.connectToAddressList(this.latestAddressList);
this.connectToAddressList(this.latestAddressList, this.latestOptions);
}
}
@ -560,7 +563,7 @@ export class LeafLoadBalancer {
constructor(
private endpoint: Endpoint,
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
private options: ChannelOptions
) {
const childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
@ -573,14 +576,17 @@ export class LeafLoadBalancer {
}
);
this.pickFirstBalancer = new PickFirstLoadBalancer(
childChannelControlHelper,
{ ...options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true }
childChannelControlHelper
);
this.latestPicker = new QueuePicker(this.pickFirstBalancer);
}
startConnecting() {
this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG);
this.pickFirstBalancer.updateAddressList(
[this.endpoint],
LEAF_CONFIG,
{ ...this.options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true }
);
}
/**
@ -589,7 +595,8 @@ export class LeafLoadBalancer {
* attempt is not already in progress.
* @param newEndpoint
*/
updateEndpoint(newEndpoint: Endpoint) {
updateEndpoint(newEndpoint: Endpoint, newOptions: ChannelOptions) {
this.options = newOptions;
this.endpoint = newEndpoint;
if (this.latestState !== ConnectivityState.IDLE) {
this.startConnecting();

View File

@ -103,8 +103,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
private lastError: string | null = null;
constructor(
private readonly channelControlHelper: ChannelControlHelper,
private readonly options: ChannelOptions
private readonly channelControlHelper: ChannelControlHelper
) {
this.childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
@ -204,7 +203,8 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
this.resetSubchannelList();
trace('Connect to endpoint list ' + endpointList.map(endpointToString));
@ -214,7 +214,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
new LeafLoadBalancer(
endpoint,
this.childChannelControlHelper,
this.options
options
)
);
for (const child of this.children) {

View File

@ -33,7 +33,7 @@ export interface ChannelControlHelper {
/**
* Returns a subchannel connected to the specified address.
* @param subchannelAddress The address to connect to
* @param subchannelArgs Extra channel arguments specified by the load balancer
* @param subchannelArgs Channel arguments to use to construct the subchannel
*/
createSubchannel(
subchannelAddress: SubchannelAddress,
@ -102,7 +102,7 @@ export interface LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
channelOptions: ChannelOptions
): void;
/**
* If the load balancer is currently in the IDLE state, start connecting.
@ -129,8 +129,7 @@ export interface LoadBalancer {
export interface LoadBalancerConstructor {
new (
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
channelControlHelper: ChannelControlHelper
): LoadBalancer;
}
@ -172,14 +171,12 @@ export function registerDefaultLoadBalancerType(typeName: string) {
export function createLoadBalancer(
config: TypedLoadBalancingConfig,
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
channelControlHelper: ChannelControlHelper
): LoadBalancer | null {
const typeName = config.getLoadBalancerName();
if (typeName in registeredLoadBalancerTypes) {
return new registeredLoadBalancerTypes[typeName].LoadBalancer(
channelControlHelper,
options
channelControlHelper
);
} else {
return null;

View File

@ -198,7 +198,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
constructor(
private readonly target: GrpcUri,
private readonly channelControlHelper: ChannelControlHelper,
channelOptions: ChannelOptions,
private readonly channelOptions: ChannelOptions,
private readonly onSuccessfulResolution: ResolutionCallback,
private readonly onFailedResolution: ResolutionFailureCallback
) {
@ -242,8 +242,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
channelControlHelper.addChannelzChild.bind(channelControlHelper),
removeChannelzChild:
channelControlHelper.removeChannelzChild.bind(channelControlHelper),
},
channelOptions
}
);
this.innerResolver = createResolver(
target,
@ -302,7 +301,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
this.childLoadBalancer.updateAddressList(
endpointList,
loadBalancingConfig,
attributes
{...this.channelOptions, ...attributes}
);
const finalServiceConfig =
workingServiceConfig ?? this.defaultServiceConfig;

View File

@ -123,10 +123,11 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.READY);
@ -142,13 +143,14 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
config
config,
{}
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.READY);
@ -164,7 +166,7 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
{
@ -174,7 +176,8 @@ describe('pick_first load balancing policy', () => {
],
},
],
config
config,
{}
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.READY);
@ -198,10 +201,11 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
config,
{}
);
});
it('Should stay CONNECTING if only some subchannels fail to connect', done => {
@ -214,13 +218,14 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
@ -236,13 +241,14 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
@ -261,13 +267,14 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
@ -300,13 +307,14 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
config
config,
{}
);
});
it('Should enter READY if a subchannel connects after entering TRANSIENT_FAILURE mode', done => {
@ -327,13 +335,14 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.READY);
@ -358,13 +367,14 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
config
config,
{}
);
process.nextTick(() => {
currentStartState = ConnectivityState.CONNECTING;
@ -373,7 +383,8 @@ describe('pick_first load balancing policy', () => {
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
config
config,
{}
);
});
});
@ -396,19 +407,21 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
config
config,
{}
);
process.nextTick(() => {
currentStartState = ConnectivityState.READY;
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 3 }] }],
config
config,
{}
);
});
});
@ -431,10 +444,11 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -459,16 +473,18 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
config,
{}
);
process.nextTick(() => {
currentStartState = ConnectivityState.IDLE;
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -494,16 +510,18 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
config,
{}
);
process.nextTick(() => {
currentStartState = ConnectivityState.TRANSIENT_FAILURE;
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -529,15 +547,17 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
config,
{}
);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -575,24 +595,27 @@ describe('pick_first load balancing policy', () => {
},
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
config
config,
{}
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 3 }] }],
config
config,
{}
);
process.nextTick(() => {
subchannels[2].transitionToState(
@ -635,20 +658,23 @@ describe('pick_first load balancing policy', () => {
},
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
config,
{}
);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
config
config,
{}
);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
config
config,
{}
);
});
});
@ -676,10 +702,11 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
config
config,
{}
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -698,8 +725,8 @@ describe('pick_first load balancing policy', () => {
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList([], config);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList([], config, {});
});
describe('Address list randomization', () => {
const shuffleConfig = new PickFirstLoadBalancingConfig(true);
@ -733,20 +760,20 @@ describe('pick_first load balancing policy', () => {
for (let i = 0; i < 10; i++) {
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
}
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
/* Pick from 10 subchannels 5 times, with address randomization enabled,
* and verify that at least two different subchannels are picked. The
* probability choosing the same address every time is 1/10,000, which
* I am considering an acceptable flake rate */
pickFirst.updateAddressList(endpoints, shuffleConfig);
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig);
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig);
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig);
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig);
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
process.nextTick(() => {
assert(pickedSubchannels.size > 1);
done();
@ -789,16 +816,16 @@ describe('pick_first load balancing policy', () => {
for (let i = 0; i < 10; i++) {
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
}
const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {});
pickFirst.updateAddressList(endpoints, config);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(endpoints, config, {});
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config);
pickFirst.updateAddressList(endpoints, config, {});
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config);
pickFirst.updateAddressList(endpoints, config, {});
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config);
pickFirst.updateAddressList(endpoints, config, {});
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config);
pickFirst.updateAddressList(endpoints, config, {});
process.nextTick(() => {
assert(pickedSubchannels.size === 1);
done();