mirror of https://github.com/grpc/grpc-node.git
Merge pull request #2561 from murgatroid99/grpc-js_pick_first_leaf
grpc-js: Make pick_first the universal leaf policy, plus related changes
This commit is contained in:
commit
092d1e96e2
|
@ -34,7 +34,7 @@ import TypedLoadBalancingConfig = grpc.experimental.TypedLoadBalancingConfig;
|
|||
import LoadBalancer = grpc.experimental.LoadBalancer;
|
||||
import ChannelControlHelper = grpc.experimental.ChannelControlHelper;
|
||||
import ChildLoadBalancerHandler = grpc.experimental.ChildLoadBalancerHandler;
|
||||
import SubchannelAddress = grpc.experimental.SubchannelAddress;
|
||||
import Endpoint = grpc.experimental.Endpoint;
|
||||
import Picker = grpc.experimental.Picker;
|
||||
import PickArgs = grpc.experimental.PickArgs;
|
||||
import PickResult = grpc.experimental.PickResult;
|
||||
|
@ -99,12 +99,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
|
|||
});
|
||||
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
|
||||
}
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
|
||||
return;
|
||||
}
|
||||
this.latestConfig = lbConfig;
|
||||
this.child.updateAddressList(addressList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
|
||||
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
|
||||
}
|
||||
exitIdle(): void {
|
||||
this.child.exitIdle();
|
||||
|
|
|
@ -18,23 +18,16 @@
|
|||
import { connectivityState, status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from '@grpc/grpc-js';
|
||||
import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client';
|
||||
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
|
||||
import SubchannelAddress = experimental.SubchannelAddress;
|
||||
import Endpoint = experimental.Endpoint;
|
||||
import UnavailablePicker = experimental.UnavailablePicker;
|
||||
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
|
||||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
import registerLoadBalancerType = experimental.registerLoadBalancerType;
|
||||
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
|
||||
import SuccessRateEjectionConfig = experimental.SuccessRateEjectionConfig;
|
||||
import FailurePercentageEjectionConfig = experimental.FailurePercentageEjectionConfig;
|
||||
import QueuePicker = experimental.QueuePicker;
|
||||
import OutlierDetectionRawConfig = experimental.OutlierDetectionRawConfig;
|
||||
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
|
||||
import { OutlierDetection__Output } from './generated/envoy/config/cluster/v3/OutlierDetection';
|
||||
import { Duration__Output } from './generated/google/protobuf/Duration';
|
||||
import { EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
|
||||
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler } from './load-balancer-xds-cluster-resolver';
|
||||
import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from './resources';
|
||||
import { CdsUpdate, ClusterResourceType } from './xds-resource-type/cluster-resource-type';
|
||||
|
||||
const TRACER_NAME = 'cds_balancer';
|
||||
|
@ -258,7 +251,7 @@ export class CdsLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig,
|
||||
attributes: { [key: string]: unknown }
|
||||
): void {
|
||||
|
|
|
@ -19,8 +19,8 @@ import { connectivityState as ConnectivityState, status as Status, Metadata, log
|
|||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
import registerLoadBalancerType = experimental.registerLoadBalancerType;
|
||||
import SubchannelAddress = experimental.SubchannelAddress;
|
||||
import subchannelAddressToString = experimental.subchannelAddressToString;
|
||||
import Endpoint = experimental.Endpoint;
|
||||
import endpointToString = experimental.endpointToString;
|
||||
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
|
||||
import Picker = experimental.Picker;
|
||||
import QueuePicker = experimental.QueuePicker;
|
||||
|
@ -40,16 +40,16 @@ const TYPE_NAME = 'priority';
|
|||
const DEFAULT_FAILOVER_TIME_MS = 10_000;
|
||||
const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000;
|
||||
|
||||
export type LocalitySubchannelAddress = SubchannelAddress & {
|
||||
export interface LocalityEndpoint extends Endpoint {
|
||||
localityPath: string[];
|
||||
locality: Locality__Output;
|
||||
weight: number;
|
||||
};
|
||||
|
||||
export function isLocalitySubchannelAddress(
|
||||
address: SubchannelAddress
|
||||
): address is LocalitySubchannelAddress {
|
||||
return Array.isArray((address as LocalitySubchannelAddress).localityPath);
|
||||
export function isLocalityEndpoint(
|
||||
address: Endpoint
|
||||
): address is LocalityEndpoint {
|
||||
return Array.isArray((address as LocalityEndpoint).localityPath);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -138,7 +138,7 @@ class PriorityLoadBalancingConfig implements TypedLoadBalancingConfig {
|
|||
|
||||
interface PriorityChildBalancer {
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig,
|
||||
attributes: { [key: string]: unknown }
|
||||
): void;
|
||||
|
@ -154,7 +154,7 @@ interface PriorityChildBalancer {
|
|||
}
|
||||
|
||||
interface UpdateArgs {
|
||||
subchannelAddress: SubchannelAddress[];
|
||||
subchannelAddress: Endpoint[];
|
||||
lbConfig: TypedLoadBalancingConfig;
|
||||
ignoreReresolutionRequests: boolean;
|
||||
}
|
||||
|
@ -218,11 +218,11 @@ export class PriorityLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig,
|
||||
attributes: { [key: string]: unknown }
|
||||
): void {
|
||||
this.childBalancer.updateAddressList(addressList, lbConfig, attributes);
|
||||
this.childBalancer.updateAddressList(endpointList, lbConfig, attributes);
|
||||
}
|
||||
|
||||
exitIdle() {
|
||||
|
@ -412,7 +412,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig,
|
||||
attributes: { [key: string]: unknown }
|
||||
): void {
|
||||
|
@ -425,23 +425,23 @@ export class PriorityLoadBalancer implements LoadBalancer {
|
|||
* which child it belongs to. So we bucket those addresses by that first
|
||||
* element, and pass along the rest of the localityPath for that child
|
||||
* to use. */
|
||||
const childAddressMap: Map<string, LocalitySubchannelAddress[]> = new Map<
|
||||
const childAddressMap: Map<string, LocalityEndpoint[]> = new Map<
|
||||
string,
|
||||
LocalitySubchannelAddress[]
|
||||
LocalityEndpoint[]
|
||||
>();
|
||||
for (const address of addressList) {
|
||||
if (!isLocalitySubchannelAddress(address)) {
|
||||
for (const endpoint of endpointList) {
|
||||
if (!isLocalityEndpoint(endpoint)) {
|
||||
// Reject address that cannot be prioritized
|
||||
return;
|
||||
}
|
||||
if (address.localityPath.length < 1) {
|
||||
if (endpoint.localityPath.length < 1) {
|
||||
// Reject address that cannot be prioritized
|
||||
return;
|
||||
}
|
||||
const childName = address.localityPath[0];
|
||||
const childAddress: LocalitySubchannelAddress = {
|
||||
...address,
|
||||
localityPath: address.localityPath.slice(1),
|
||||
const childName = endpoint.localityPath[0];
|
||||
const childAddress: LocalityEndpoint = {
|
||||
...endpoint,
|
||||
localityPath: endpoint.localityPath.slice(1),
|
||||
};
|
||||
let childAddressList = childAddressMap.get(childName);
|
||||
if (childAddressList === undefined) {
|
||||
|
@ -458,7 +458,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
|
|||
* update all existing children with their new configs */
|
||||
for (const [childName, childConfig] of lbConfig.getChildren()) {
|
||||
const childAddresses = childAddressMap.get(childName) ?? [];
|
||||
trace('Assigning child ' + childName + ' address list ' + childAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'))
|
||||
trace('Assigning child ' + childName + ' endpoint list ' + childAddresses.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'))
|
||||
this.latestUpdates.set(childName, {
|
||||
subchannelAddress: childAddresses,
|
||||
lbConfig: childConfig.config,
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
|
||||
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from "@grpc/grpc-js";
|
||||
import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority";
|
||||
import { isLocalityEndpoint, LocalityEndpoint } from "./load-balancer-priority";
|
||||
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
|
||||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
|
@ -27,8 +27,8 @@ import PickResult = experimental.PickResult;
|
|||
import PickArgs = experimental.PickArgs;
|
||||
import QueuePicker = experimental.QueuePicker;
|
||||
import UnavailablePicker = experimental.UnavailablePicker;
|
||||
import SubchannelAddress = experimental.SubchannelAddress;
|
||||
import subchannelAddressToString = experimental.subchannelAddressToString;
|
||||
import Endpoint = experimental.Endpoint;
|
||||
import endpointToString = experimental.endpointToString;
|
||||
import selectLbConfigFromList = experimental.selectLbConfigFromList;
|
||||
|
||||
const TRACER_NAME = 'weighted_target';
|
||||
|
@ -154,7 +154,7 @@ class WeightedTargetPicker implements Picker {
|
|||
}
|
||||
|
||||
interface WeightedChild {
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void;
|
||||
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void;
|
||||
exitIdle(): void;
|
||||
resetBackoff(): void;
|
||||
destroy(): void;
|
||||
|
@ -190,9 +190,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
|
|||
this.parent.maybeUpdateState();
|
||||
}
|
||||
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void {
|
||||
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void {
|
||||
this.weight = lbConfig.weight;
|
||||
this.childBalancer.updateAddressList(addressList, lbConfig.child_policy, attributes);
|
||||
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, attributes);
|
||||
}
|
||||
exitIdle(): void {
|
||||
this.childBalancer.exitIdle();
|
||||
|
@ -319,7 +319,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
|
|||
this.channelControlHelper.updateState(connectivityState, picker);
|
||||
}
|
||||
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) {
|
||||
// Reject a config of the wrong type
|
||||
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
|
||||
|
@ -330,9 +330,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
|
|||
* which child it belongs to. So we bucket those addresses by that first
|
||||
* element, and pass along the rest of the localityPath for that child
|
||||
* to use. */
|
||||
const childAddressMap = new Map<string, LocalitySubchannelAddress[]>();
|
||||
const childEndpointMap = new Map<string, LocalityEndpoint[]>();
|
||||
for (const address of addressList) {
|
||||
if (!isLocalitySubchannelAddress(address)) {
|
||||
if (!isLocalityEndpoint(address)) {
|
||||
// Reject address that cannot be associated with targets
|
||||
return;
|
||||
}
|
||||
|
@ -341,14 +341,14 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
|
|||
return;
|
||||
}
|
||||
const childName = address.localityPath[0];
|
||||
const childAddress: LocalitySubchannelAddress = {
|
||||
const childAddress: LocalityEndpoint = {
|
||||
...address,
|
||||
localityPath: address.localityPath.slice(1),
|
||||
};
|
||||
let childAddressList = childAddressMap.get(childName);
|
||||
let childAddressList = childEndpointMap.get(childName);
|
||||
if (childAddressList === undefined) {
|
||||
childAddressList = [];
|
||||
childAddressMap.set(childName, childAddressList);
|
||||
childEndpointMap.set(childName, childAddressList);
|
||||
}
|
||||
childAddressList.push(childAddress);
|
||||
}
|
||||
|
@ -363,9 +363,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
|
|||
} else {
|
||||
target.maybeReactivate();
|
||||
}
|
||||
const targetAddresses = childAddressMap.get(targetName) ?? [];
|
||||
trace('Assigning target ' + targetName + ' address list ' + targetAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'));
|
||||
target.updateAddressList(targetAddresses, targetConfig, attributes);
|
||||
const targetEndpoints = childEndpointMap.get(targetName) ?? [];
|
||||
trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'));
|
||||
target.updateAddressList(targetEndpoints, targetConfig, attributes);
|
||||
}
|
||||
|
||||
// Deactivate targets that are not in the new config
|
||||
|
|
|
@ -18,11 +18,13 @@
|
|||
import { experimental, logVerbosity, status as Status, Metadata, connectivityState } from "@grpc/grpc-js";
|
||||
import { validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap";
|
||||
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats, XdsClusterLocalityStats } from "./xds-client";
|
||||
import { LocalitySubchannelAddress } from "./load-balancer-priority";
|
||||
import { LocalityEndpoint } from "./load-balancer-priority";
|
||||
|
||||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import registerLoadBalancerType = experimental.registerLoadBalancerType;
|
||||
import SubchannelAddress = experimental.SubchannelAddress;
|
||||
import Endpoint = experimental.Endpoint;
|
||||
import endpointHasAddress = experimental.endpointHasAddress;
|
||||
import subchannelAddressToString = experimental.subchannelAddressToString;
|
||||
import Picker = experimental.Picker;
|
||||
import PickArgs = experimental.PickArgs;
|
||||
import PickResult = experimental.PickResult;
|
||||
|
@ -34,6 +36,7 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
|
|||
import selectLbConfigFromList = experimental.selectLbConfigFromList;
|
||||
import SubchannelInterface = experimental.SubchannelInterface;
|
||||
import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper;
|
||||
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
|
||||
|
||||
const TRACER_NAME = 'xds_cluster_impl';
|
||||
|
||||
|
@ -245,6 +248,7 @@ function getCallCounterMapKey(cluster: string, edsServiceName?: string): string
|
|||
|
||||
class XdsClusterImplBalancer implements LoadBalancer {
|
||||
private childBalancer: ChildLoadBalancerHandler;
|
||||
private lastestEndpointList: Endpoint[] | null = null;
|
||||
private latestConfig: XdsClusterImplLoadBalancingConfig | null = null;
|
||||
private clusterDropStats: XdsClusterDropStats | null = null;
|
||||
private xdsClient: XdsClient | null = null;
|
||||
|
@ -252,11 +256,20 @@ class XdsClusterImplBalancer implements LoadBalancer {
|
|||
constructor(private readonly channelControlHelper: ChannelControlHelper) {
|
||||
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
|
||||
createSubchannel: (subchannelAddress, subchannelArgs) => {
|
||||
if (!this.xdsClient || !this.latestConfig) {
|
||||
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList) {
|
||||
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated');
|
||||
}
|
||||
const locality = (subchannelAddress as LocalitySubchannelAddress).locality ?? '';
|
||||
const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
|
||||
let locality: Locality__Output | null = null;
|
||||
for (const endpoint of this.lastestEndpointList) {
|
||||
if (endpointHasAddress(endpoint, subchannelAddress)) {
|
||||
locality = (endpoint as LocalityEndpoint).locality;
|
||||
}
|
||||
}
|
||||
if (locality === null) {
|
||||
trace('Not reporting load for address ' + subchannelAddressToString(subchannelAddress) + ' because it has unknown locality.');
|
||||
return wrapperChild;
|
||||
}
|
||||
const lrsServer = this.latestConfig.getLrsLoadReportingServer();
|
||||
let statsObj: XdsClusterLocalityStats | null = null;
|
||||
if (lrsServer) {
|
||||
|
@ -279,15 +292,15 @@ class XdsClusterImplBalancer implements LoadBalancer {
|
|||
}
|
||||
}));
|
||||
}
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) {
|
||||
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
|
||||
return;
|
||||
}
|
||||
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
|
||||
this.lastestEndpointList = endpointList;
|
||||
this.latestConfig = lbConfig;
|
||||
this.xdsClient = attributes.xdsClient as XdsClient;
|
||||
|
||||
if (lbConfig.getLrsLoadReportingServer()) {
|
||||
this.clusterDropStats = this.xdsClient.addClusterDropStats(
|
||||
lbConfig.getLrsLoadReportingServer()!,
|
||||
|
@ -296,7 +309,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
|
|||
);
|
||||
}
|
||||
|
||||
this.childBalancer.updateAddressList(addressList, lbConfig.getChildPolicy(), attributes);
|
||||
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), attributes);
|
||||
}
|
||||
exitIdle(): void {
|
||||
this.childBalancer.exitIdle();
|
||||
|
|
|
@ -25,7 +25,7 @@ import PickArgs = experimental.PickArgs;
|
|||
import PickResultType = experimental.PickResultType;
|
||||
import UnavailablePicker = experimental.UnavailablePicker;
|
||||
import QueuePicker = experimental.QueuePicker;
|
||||
import SubchannelAddress = experimental.SubchannelAddress;
|
||||
import Endpoint = experimental.Endpoint;
|
||||
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
import selectLbConfigFromList = experimental.selectLbConfigFromList;
|
||||
|
@ -111,7 +111,7 @@ class XdsClusterManagerPicker implements Picker {
|
|||
}
|
||||
|
||||
interface XdsClusterManagerChild {
|
||||
updateAddressList(addressList: SubchannelAddress[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void;
|
||||
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void;
|
||||
exitIdle(): void;
|
||||
resetBackoff(): void;
|
||||
destroy(): void;
|
||||
|
@ -142,8 +142,8 @@ class XdsClusterManager implements LoadBalancer {
|
|||
this.picker = picker;
|
||||
this.parent.maybeUpdateState();
|
||||
}
|
||||
updateAddressList(addressList: SubchannelAddress[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
this.childBalancer.updateAddressList(addressList, childConfig, attributes);
|
||||
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
this.childBalancer.updateAddressList(endpointList, childConfig, attributes);
|
||||
}
|
||||
exitIdle(): void {
|
||||
this.childBalancer.exitIdle();
|
||||
|
@ -235,7 +235,7 @@ class XdsClusterManager implements LoadBalancer {
|
|||
this.channelControlHelper.updateState(connectivityState, picker);
|
||||
}
|
||||
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) {
|
||||
// Reject a config of the wrong type
|
||||
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
|
||||
|
@ -259,7 +259,7 @@ class XdsClusterManager implements LoadBalancer {
|
|||
for (const [name, childConfig] of configChildren.entries()) {
|
||||
if (!this.children.has(name)) {
|
||||
const newChild = new this.XdsClusterManagerChildImpl(this, name);
|
||||
newChild.updateAddressList(addressList, childConfig, attributes);
|
||||
newChild.updateAddressList(endpointList, childConfig, attributes);
|
||||
this.children.set(name, newChild);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import { registerLoadBalancerType } from "@grpc/grpc-js/build/src/load-balancer"
|
|||
import { EXPERIMENTAL_OUTLIER_DETECTION } from "./environment";
|
||||
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
|
||||
import { ClusterLoadAssignment__Output } from "./generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
|
||||
import { LocalitySubchannelAddress, PriorityChildRaw } from "./load-balancer-priority";
|
||||
import { LocalityEndpoint, PriorityChildRaw } from "./load-balancer-priority";
|
||||
import { getSingletonXdsClient, Watcher, XdsClient } from "./xds-client";
|
||||
import { DropCategory } from "./load-balancer-xds-cluster-impl";
|
||||
|
||||
|
@ -28,11 +28,13 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
|
|||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import Resolver = experimental.Resolver;
|
||||
import SubchannelAddress = experimental.SubchannelAddress;
|
||||
import Endpoint = experimental.Endpoint;
|
||||
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
|
||||
import createResolver = experimental.createResolver;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
import OutlierDetectionRawConfig = experimental.OutlierDetectionRawConfig;
|
||||
import subchannelAddressToString = experimental.subchannelAddressToString;
|
||||
import endpointToString = experimental.endpointToString;
|
||||
import selectLbConfigFromList = experimental.selectLbConfigFromList;
|
||||
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
|
||||
import UnavailablePicker = experimental.UnavailablePicker;
|
||||
|
@ -116,7 +118,7 @@ class XdsClusterResolverLoadBalancingConfig implements TypedLoadBalancingConfig
|
|||
interface LocalityEntry {
|
||||
locality: Locality__Output;
|
||||
weight: number;
|
||||
addresses: SubchannelAddress[];
|
||||
endpoints: Endpoint[];
|
||||
}
|
||||
|
||||
interface PriorityEntry {
|
||||
|
@ -164,18 +166,20 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt
|
|||
if (!endpoint.load_balancing_weight) {
|
||||
continue;
|
||||
}
|
||||
const addresses: SubchannelAddress[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map(
|
||||
const endpoints: Endpoint[] = endpoint.lb_endpoints.filter(lbEndpoint => lbEndpoint.health_status === 'UNKNOWN' || lbEndpoint.health_status === 'HEALTHY').map(
|
||||
(lbEndpoint) => {
|
||||
/* The validator in the XdsClient class ensures that each endpoint has
|
||||
* a socket_address with an IP address and a port_value. */
|
||||
const socketAddress = lbEndpoint.endpoint!.address!.socket_address!;
|
||||
return {
|
||||
host: socketAddress.address!,
|
||||
port: socketAddress.port_value!,
|
||||
addresses: [{
|
||||
host: socketAddress.address!,
|
||||
port: socketAddress.port_value!,
|
||||
}]
|
||||
};
|
||||
}
|
||||
);
|
||||
if (addresses.length === 0) {
|
||||
if (endpoints.length === 0) {
|
||||
continue;
|
||||
}
|
||||
let priorityEntry: PriorityEntry;
|
||||
|
@ -190,7 +194,7 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt
|
|||
}
|
||||
priorityEntry.localities.push({
|
||||
locality: endpoint.locality!,
|
||||
addresses: addresses,
|
||||
endpoints: endpoints,
|
||||
weight: endpoint.load_balancing_weight.value
|
||||
});
|
||||
}
|
||||
|
@ -198,7 +202,7 @@ function getEdsPriorities(edsUpdate: ClusterLoadAssignment__Output): PriorityEnt
|
|||
return result.filter(priority => priority);
|
||||
}
|
||||
|
||||
function getDnsPriorities(addresses: SubchannelAddress[]): PriorityEntry[] {
|
||||
function getDnsPriorities(endpoints: Endpoint[]): PriorityEntry[] {
|
||||
return [{
|
||||
localities: [{
|
||||
locality: {
|
||||
|
@ -207,7 +211,7 @@ function getDnsPriorities(addresses: SubchannelAddress[]): PriorityEntry[] {
|
|||
sub_zone: ''
|
||||
},
|
||||
weight: 1,
|
||||
addresses: addresses
|
||||
endpoints: endpoints
|
||||
}],
|
||||
dropCategories: []
|
||||
}];
|
||||
|
@ -249,7 +253,7 @@ export class XdsClusterResolver implements LoadBalancer {
|
|||
}
|
||||
const fullPriorityList: string[] = [];
|
||||
const priorityChildren: {[name: string]: PriorityChildRaw} = {};
|
||||
const addressList: LocalitySubchannelAddress[] = [];
|
||||
const endpointList: LocalityEndpoint[] = [];
|
||||
const edsChildPolicy = this.latestConfig.getXdsLbPolicy();
|
||||
for (const entry of this.discoveryMechanismList) {
|
||||
const newPriorityNames: string[] = [];
|
||||
|
@ -291,15 +295,15 @@ export class XdsClusterResolver implements LoadBalancer {
|
|||
newPriorityNames[priority] = newPriorityName;
|
||||
|
||||
for (const localityObj of priorityEntry.localities) {
|
||||
for (const address of localityObj.addresses) {
|
||||
addressList.push({
|
||||
for (const endpoint of localityObj.endpoints) {
|
||||
endpointList.push({
|
||||
localityPath: [
|
||||
newPriorityName,
|
||||
localityToName(localityObj.locality),
|
||||
],
|
||||
locality: localityObj.locality,
|
||||
weight: localityObj.weight,
|
||||
...address,
|
||||
...endpoint
|
||||
});
|
||||
}
|
||||
newLocalityPriorities.set(localityToName(localityObj.locality), priority);
|
||||
|
@ -349,16 +353,16 @@ export class XdsClusterResolver implements LoadBalancer {
|
|||
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `LB policy config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()}));
|
||||
return;
|
||||
}
|
||||
trace('Child update addresses: ' + addressList.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'));
|
||||
trace('Child update addresses: ' + endpointList.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'));
|
||||
trace('Child update priority config: ' + JSON.stringify(childConfig, undefined, 2));
|
||||
this.childBalancer.updateAddressList(
|
||||
addressList,
|
||||
endpointList,
|
||||
typedChildConfig,
|
||||
this.latestAttributes
|
||||
);
|
||||
}
|
||||
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
if (!(lbConfig instanceof XdsClusterResolverLoadBalancingConfig)) {
|
||||
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
|
||||
return;
|
||||
|
@ -399,8 +403,8 @@ export class XdsClusterResolver implements LoadBalancer {
|
|||
}
|
||||
} else {
|
||||
const resolver = createResolver({scheme: 'dns', path: mechanism.dns_hostname!}, {
|
||||
onSuccessfulResolution: addressList => {
|
||||
mechanismEntry.latestUpdate = getDnsPriorities(addressList);
|
||||
onSuccessfulResolution: endpointList => {
|
||||
mechanismEntry.latestUpdate = getDnsPriorities(endpointList);
|
||||
this.maybeUpdateChild();
|
||||
},
|
||||
onError: error => {
|
||||
|
|
|
@ -20,13 +20,13 @@
|
|||
import { LoadBalancingConfig, experimental, logVerbosity } from "@grpc/grpc-js";
|
||||
import { loadProtosWithOptionsSync } from "@grpc/proto-loader/build/src/util";
|
||||
import { WeightedTargetRaw } from "./load-balancer-weighted-target";
|
||||
import { isLocalitySubchannelAddress } from "./load-balancer-priority";
|
||||
import { isLocalityEndpoint } from "./load-balancer-priority";
|
||||
import { localityToName } from "./load-balancer-xds-cluster-resolver";
|
||||
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
|
||||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
|
||||
import SubchannelAddress = experimental.SubchannelAddress;
|
||||
import Endpoint = experimental.Endpoint;
|
||||
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
|
||||
import registerLoadBalancerType = experimental.registerLoadBalancerType;
|
||||
import { Any__Output } from "./generated/google/protobuf/Any";
|
||||
|
@ -76,14 +76,14 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
|
|||
constructor(private readonly channelControlHelper: ChannelControlHelper) {
|
||||
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
|
||||
}
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
if (!(lbConfig instanceof XdsWrrLocalityLoadBalancingConfig)) {
|
||||
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
|
||||
return;
|
||||
}
|
||||
const targets: {[localityName: string]: WeightedTargetRaw} = {};
|
||||
for (const address of addressList) {
|
||||
if (!isLocalitySubchannelAddress(address)) {
|
||||
for (const address of endpointList) {
|
||||
if (!isLocalityEndpoint(address)) {
|
||||
return;
|
||||
}
|
||||
const localityName = localityToName(address.locality);
|
||||
|
@ -99,7 +99,7 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
|
|||
targets: targets
|
||||
}
|
||||
};
|
||||
this.childBalancer.updateAddressList(addressList, parseLoadBalancingConfig(childConfig), attributes);
|
||||
this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), attributes);
|
||||
}
|
||||
exitIdle(): void {
|
||||
this.childBalancer.exitIdle();
|
||||
|
|
|
@ -30,7 +30,7 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
|
|||
import LoadBalancer = experimental.LoadBalancer;
|
||||
import ChannelControlHelper = experimental.ChannelControlHelper;
|
||||
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
|
||||
import SubchannelAddress = experimental.SubchannelAddress;
|
||||
import Endpoint = experimental.Endpoint;
|
||||
import Picker = experimental.Picker;
|
||||
import PickArgs = experimental.PickArgs;
|
||||
import PickResult = experimental.PickResult;
|
||||
|
@ -94,12 +94,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
|
|||
});
|
||||
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
|
||||
}
|
||||
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
|
||||
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
|
||||
return;
|
||||
}
|
||||
this.latestConfig = lbConfig;
|
||||
this.child.updateAddressList(addressList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
|
||||
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
|
||||
}
|
||||
exitIdle(): void {
|
||||
this.child.exitIdle();
|
||||
|
|
|
@ -17,11 +17,15 @@ export {
|
|||
registerLoadBalancerType,
|
||||
selectLbConfigFromList,
|
||||
parseLoadBalancingConfig,
|
||||
isLoadBalancerNameRegistered
|
||||
isLoadBalancerNameRegistered,
|
||||
} from './load-balancer';
|
||||
export { LeafLoadBalancer } from './load-balancer-pick-first';
|
||||
export {
|
||||
SubchannelAddress,
|
||||
subchannelAddressToString,
|
||||
Endpoint,
|
||||
endpointToString,
|
||||
endpointHasAddress,
|
||||
} from './subchannel-address';
|
||||
export { ChildLoadBalancerHandler } from './load-balancer-child-handler';
|
||||
export {
|
||||
|
@ -40,6 +44,7 @@ export {
|
|||
SubchannelInterface,
|
||||
BaseSubchannelWrapper,
|
||||
ConnectivityStateListener,
|
||||
HealthListener,
|
||||
} from './subchannel-interface';
|
||||
export {
|
||||
OutlierDetectionRawConfig,
|
||||
|
|
|
@ -21,7 +21,7 @@ import {
|
|||
TypedLoadBalancingConfig,
|
||||
createLoadBalancer,
|
||||
} from './load-balancer';
|
||||
import { SubchannelAddress } from './subchannel-address';
|
||||
import { Endpoint, SubchannelAddress } from './subchannel-address';
|
||||
import { ChannelOptions } from './channel-options';
|
||||
import { ConnectivityState } from './connectivity-state';
|
||||
import { Picker } from './picker';
|
||||
|
@ -95,12 +95,12 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
|
|||
|
||||
/**
|
||||
* Prerequisites: lbConfig !== null and lbConfig.name is registered
|
||||
* @param addressList
|
||||
* @param endpointList
|
||||
* @param lbConfig
|
||||
* @param attributes
|
||||
*/
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig,
|
||||
attributes: { [key: string]: unknown }
|
||||
): void {
|
||||
|
@ -131,7 +131,7 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
|
|||
}
|
||||
}
|
||||
this.latestConfig = lbConfig;
|
||||
childToUpdate.updateAddressList(addressList, lbConfig, attributes);
|
||||
childToUpdate.updateAddressList(endpointList, lbConfig, attributes);
|
||||
}
|
||||
exitIdle(): void {
|
||||
if (this.currentChild) {
|
||||
|
|
|
@ -32,12 +32,14 @@ import {
|
|||
import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
|
||||
import { PickArgs, Picker, PickResult, PickResultType } from './picker';
|
||||
import {
|
||||
Endpoint,
|
||||
SubchannelAddress,
|
||||
subchannelAddressToString,
|
||||
endpointHasAddress,
|
||||
endpointToString,
|
||||
subchannelAddressEqual,
|
||||
} from './subchannel-address';
|
||||
import {
|
||||
BaseSubchannelWrapper,
|
||||
ConnectivityStateListener,
|
||||
SubchannelInterface,
|
||||
} from './subchannel-interface';
|
||||
import * as logging from './logging';
|
||||
|
@ -107,7 +109,11 @@ function validateFieldType(
|
|||
expectedType: TypeofValues,
|
||||
objectName?: string
|
||||
) {
|
||||
if (fieldName in obj && obj[fieldName] !== undefined && typeof obj[fieldName] !== expectedType) {
|
||||
if (
|
||||
fieldName in obj &&
|
||||
obj[fieldName] !== undefined &&
|
||||
typeof obj[fieldName] !== expectedType
|
||||
) {
|
||||
const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
|
||||
throw new Error(
|
||||
`outlier detection config ${fullFieldName} parse error: expected ${expectedType}, got ${typeof obj[
|
||||
|
@ -149,7 +155,11 @@ function validatePositiveDuration(
|
|||
function validatePercentage(obj: any, fieldName: string, objectName?: string) {
|
||||
const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
|
||||
validateFieldType(obj, fieldName, 'number', objectName);
|
||||
if (fieldName in obj && obj[fieldName] !== undefined && !(obj[fieldName] >= 0 && obj[fieldName] <= 100)) {
|
||||
if (
|
||||
fieldName in obj &&
|
||||
obj[fieldName] !== undefined &&
|
||||
!(obj[fieldName] >= 0 && obj[fieldName] <= 100)
|
||||
) {
|
||||
throw new Error(
|
||||
`outlier detection config ${fullFieldName} parse error: value out of range for percentage (0-100)`
|
||||
);
|
||||
|
@ -175,9 +185,7 @@ export class OutlierDetectionLoadBalancingConfig
|
|||
failurePercentageEjection: Partial<FailurePercentageEjectionConfig> | null,
|
||||
private readonly childPolicy: TypedLoadBalancingConfig
|
||||
) {
|
||||
if (
|
||||
childPolicy.getLoadBalancerName() === 'pick_first'
|
||||
) {
|
||||
if (childPolicy.getLoadBalancerName() === 'pick_first') {
|
||||
throw new Error(
|
||||
'outlier_detection LB policy cannot have a pick_first child policy'
|
||||
);
|
||||
|
@ -207,9 +215,10 @@ export class OutlierDetectionLoadBalancingConfig
|
|||
max_ejection_time: msToDuration(this.maxEjectionTimeMs),
|
||||
max_ejection_percent: this.maxEjectionPercent,
|
||||
success_rate_ejection: this.successRateEjection ?? undefined,
|
||||
failure_percentage_ejection: this.failurePercentageEjection ?? undefined,
|
||||
child_policy: [this.childPolicy.toJsonObject()]
|
||||
}
|
||||
failure_percentage_ejection:
|
||||
this.failurePercentageEjection ?? undefined,
|
||||
child_policy: [this.childPolicy.toJsonObject()],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -240,7 +249,10 @@ export class OutlierDetectionLoadBalancingConfig
|
|||
validatePositiveDuration(obj, 'base_ejection_time');
|
||||
validatePositiveDuration(obj, 'max_ejection_time');
|
||||
validatePercentage(obj, 'max_ejection_percent');
|
||||
if ('success_rate_ejection' in obj && obj.success_rate_ejection !== undefined) {
|
||||
if (
|
||||
'success_rate_ejection' in obj &&
|
||||
obj.success_rate_ejection !== undefined
|
||||
) {
|
||||
if (typeof obj.success_rate_ejection !== 'object') {
|
||||
throw new Error(
|
||||
'outlier detection config success_rate_ejection must be an object'
|
||||
|
@ -270,7 +282,10 @@ export class OutlierDetectionLoadBalancingConfig
|
|||
'success_rate_ejection'
|
||||
);
|
||||
}
|
||||
if ('failure_percentage_ejection' in obj && obj.failure_percentage_ejection !== undefined) {
|
||||
if (
|
||||
'failure_percentage_ejection' in obj &&
|
||||
obj.failure_percentage_ejection !== undefined
|
||||
) {
|
||||
if (typeof obj.failure_percentage_ejection !== 'object') {
|
||||
throw new Error(
|
||||
'outlier detection config failure_percentage_ejection must be an object'
|
||||
|
@ -305,7 +320,9 @@ export class OutlierDetectionLoadBalancingConfig
|
|||
}
|
||||
const childPolicy = selectLbConfigFromList(obj.child_policy);
|
||||
if (!childPolicy) {
|
||||
throw new Error('outlier detection config child_policy: no valid recognized policy found');
|
||||
throw new Error(
|
||||
'outlier detection config child_policy: no valid recognized policy found'
|
||||
);
|
||||
}
|
||||
|
||||
return new OutlierDetectionLoadBalancingConfig(
|
||||
|
@ -324,55 +341,12 @@ class OutlierDetectionSubchannelWrapper
|
|||
extends BaseSubchannelWrapper
|
||||
implements SubchannelInterface
|
||||
{
|
||||
private childSubchannelState: ConnectivityState;
|
||||
private stateListeners: ConnectivityStateListener[] = [];
|
||||
private ejected = false;
|
||||
private refCount = 0;
|
||||
constructor(
|
||||
childSubchannel: SubchannelInterface,
|
||||
private mapEntry?: MapEntry
|
||||
) {
|
||||
super(childSubchannel);
|
||||
this.childSubchannelState = childSubchannel.getConnectivityState();
|
||||
childSubchannel.addConnectivityStateListener(
|
||||
(subchannel, previousState, newState, keepaliveTime) => {
|
||||
this.childSubchannelState = newState;
|
||||
if (!this.ejected) {
|
||||
for (const listener of this.stateListeners) {
|
||||
listener(this, previousState, newState, keepaliveTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
getConnectivityState(): ConnectivityState {
|
||||
if (this.ejected) {
|
||||
return ConnectivityState.TRANSIENT_FAILURE;
|
||||
} else {
|
||||
return this.childSubchannelState;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a listener function to be called whenever the wrapper's
|
||||
* connectivity state changes.
|
||||
* @param listener
|
||||
*/
|
||||
addConnectivityStateListener(listener: ConnectivityStateListener) {
|
||||
this.stateListeners.push(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a listener previously added with `addConnectivityStateListener`
|
||||
* @param listener A reference to a function previously passed to
|
||||
* `addConnectivityStateListener`
|
||||
*/
|
||||
removeConnectivityStateListener(listener: ConnectivityStateListener) {
|
||||
const listenerIndex = this.stateListeners.indexOf(listener);
|
||||
if (listenerIndex > -1) {
|
||||
this.stateListeners.splice(listenerIndex, 1);
|
||||
}
|
||||
}
|
||||
|
||||
ref() {
|
||||
|
@ -394,27 +368,11 @@ class OutlierDetectionSubchannelWrapper
|
|||
}
|
||||
|
||||
eject() {
|
||||
this.ejected = true;
|
||||
for (const listener of this.stateListeners) {
|
||||
listener(
|
||||
this,
|
||||
this.childSubchannelState,
|
||||
ConnectivityState.TRANSIENT_FAILURE,
|
||||
-1
|
||||
);
|
||||
}
|
||||
this.setHealthy(false);
|
||||
}
|
||||
|
||||
uneject() {
|
||||
this.ejected = false;
|
||||
for (const listener of this.stateListeners) {
|
||||
listener(
|
||||
this,
|
||||
ConnectivityState.TRANSIENT_FAILURE,
|
||||
this.childSubchannelState,
|
||||
-1
|
||||
);
|
||||
}
|
||||
this.setHealthy(true);
|
||||
}
|
||||
|
||||
getMapEntry(): MapEntry | undefined {
|
||||
|
@ -459,13 +417,6 @@ class CallCounter {
|
|||
}
|
||||
}
|
||||
|
||||
interface MapEntry {
|
||||
counter: CallCounter;
|
||||
currentEjectionTimestamp: Date | null;
|
||||
ejectionTimeMultiplier: number;
|
||||
subchannelWrappers: OutlierDetectionSubchannelWrapper[];
|
||||
}
|
||||
|
||||
class OutlierDetectionPicker implements Picker {
|
||||
constructor(private wrappedPicker: Picker, private countCalls: boolean) {}
|
||||
pick(pickArgs: PickArgs): PickResult {
|
||||
|
@ -503,9 +454,133 @@ class OutlierDetectionPicker implements Picker {
|
|||
}
|
||||
}
|
||||
|
||||
interface MapEntry {
|
||||
counter: CallCounter;
|
||||
currentEjectionTimestamp: Date | null;
|
||||
ejectionTimeMultiplier: number;
|
||||
subchannelWrappers: OutlierDetectionSubchannelWrapper[];
|
||||
}
|
||||
|
||||
interface EndpointMapEntry {
|
||||
key: Endpoint;
|
||||
value: MapEntry;
|
||||
}
|
||||
|
||||
function endpointEqualUnordered(
|
||||
endpoint1: Endpoint,
|
||||
endpoint2: Endpoint
|
||||
): boolean {
|
||||
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
|
||||
return false;
|
||||
}
|
||||
for (const address1 of endpoint1.addresses) {
|
||||
let matchFound = false;
|
||||
for (const address2 of endpoint2.addresses) {
|
||||
if (subchannelAddressEqual(address1, address2)) {
|
||||
matchFound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!matchFound) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
class EndpointMap {
|
||||
private map: Set<EndpointMapEntry> = new Set();
|
||||
|
||||
get size() {
|
||||
return this.map.size;
|
||||
}
|
||||
|
||||
getForSubchannelAddress(address: SubchannelAddress): MapEntry | undefined {
|
||||
for (const entry of this.map) {
|
||||
if (endpointHasAddress(entry.key, address)) {
|
||||
return entry.value;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete any entries in this map with keys that are not in endpoints
|
||||
* @param endpoints
|
||||
*/
|
||||
deleteMissing(endpoints: Endpoint[]) {
|
||||
for (const entry of this.map) {
|
||||
let foundEntry = false;
|
||||
for (const endpoint of endpoints) {
|
||||
if (endpointEqualUnordered(endpoint, entry.key)) {
|
||||
foundEntry = true;
|
||||
}
|
||||
}
|
||||
if (!foundEntry) {
|
||||
this.map.delete(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
get(endpoint: Endpoint): MapEntry | undefined {
|
||||
for (const entry of this.map) {
|
||||
if (endpointEqualUnordered(endpoint, entry.key)) {
|
||||
return entry.value;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
set(endpoint: Endpoint, mapEntry: MapEntry) {
|
||||
for (const entry of this.map) {
|
||||
if (endpointEqualUnordered(endpoint, entry.key)) {
|
||||
entry.value = mapEntry;
|
||||
return;
|
||||
}
|
||||
}
|
||||
this.map.add({ key: endpoint, value: mapEntry });
|
||||
}
|
||||
|
||||
delete(endpoint: Endpoint) {
|
||||
for (const entry of this.map) {
|
||||
if (endpointEqualUnordered(endpoint, entry.key)) {
|
||||
this.map.delete(entry);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
has(endpoint: Endpoint): boolean {
|
||||
for (const entry of this.map) {
|
||||
if (endpointEqualUnordered(endpoint, entry.key)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
*keys(): IterableIterator<Endpoint> {
|
||||
for (const entry of this.map) {
|
||||
yield entry.key;
|
||||
}
|
||||
}
|
||||
|
||||
*values(): IterableIterator<MapEntry> {
|
||||
for (const entry of this.map) {
|
||||
yield entry.value;
|
||||
}
|
||||
}
|
||||
|
||||
*entries(): IterableIterator<[Endpoint, MapEntry]> {
|
||||
for (const entry of this.map) {
|
||||
yield [entry.key, entry.value];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
||||
private childBalancer: ChildLoadBalancerHandler;
|
||||
private addressMap: Map<string, MapEntry> = new Map<string, MapEntry>();
|
||||
private entryMap = new EndpointMap();
|
||||
private latestConfig: OutlierDetectionLoadBalancingConfig | null = null;
|
||||
private ejectionTimer: NodeJS.Timeout;
|
||||
private timerStartTime: Date | null = null;
|
||||
|
@ -521,9 +596,8 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
subchannelAddress,
|
||||
subchannelArgs
|
||||
);
|
||||
const mapEntry = this.addressMap.get(
|
||||
subchannelAddressToString(subchannelAddress)
|
||||
);
|
||||
const mapEntry =
|
||||
this.entryMap.getForSubchannelAddress(subchannelAddress);
|
||||
const subchannelWrapper = new OutlierDetectionSubchannelWrapper(
|
||||
originalSubchannel,
|
||||
mapEntry
|
||||
|
@ -561,12 +635,12 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
|
||||
private getCurrentEjectionPercent() {
|
||||
let ejectionCount = 0;
|
||||
for (const mapEntry of this.addressMap.values()) {
|
||||
for (const mapEntry of this.entryMap.values()) {
|
||||
if (mapEntry.currentEjectionTimestamp !== null) {
|
||||
ejectionCount += 1;
|
||||
}
|
||||
}
|
||||
return (ejectionCount * 100) / this.addressMap.size;
|
||||
return (ejectionCount * 100) / this.entryMap.size;
|
||||
}
|
||||
|
||||
private runSuccessRateCheck(ejectionTimestamp: Date) {
|
||||
|
@ -582,12 +656,12 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
const targetRequestVolume = successRateConfig.request_volume;
|
||||
let addresesWithTargetVolume = 0;
|
||||
const successRates: number[] = [];
|
||||
for (const [address, mapEntry] of this.addressMap) {
|
||||
for (const [endpoint, mapEntry] of this.entryMap.entries()) {
|
||||
const successes = mapEntry.counter.getLastSuccesses();
|
||||
const failures = mapEntry.counter.getLastFailures();
|
||||
trace(
|
||||
'Stats for ' +
|
||||
address +
|
||||
endpointToString(endpoint) +
|
||||
': successes=' +
|
||||
successes +
|
||||
' failures=' +
|
||||
|
@ -631,7 +705,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
);
|
||||
|
||||
// Step 3
|
||||
for (const [address, mapEntry] of this.addressMap.entries()) {
|
||||
for (const [address, mapEntry] of this.entryMap.entries()) {
|
||||
// Step 3.i
|
||||
if (
|
||||
this.getCurrentEjectionPercent() >=
|
||||
|
@ -683,7 +757,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
);
|
||||
// Step 1
|
||||
let addressesWithTargetVolume = 0;
|
||||
for (const mapEntry of this.addressMap.values()) {
|
||||
for (const mapEntry of this.entryMap.values()) {
|
||||
const successes = mapEntry.counter.getLastSuccesses();
|
||||
const failures = mapEntry.counter.getLastFailures();
|
||||
if (successes + failures >= failurePercentageConfig.request_volume) {
|
||||
|
@ -695,7 +769,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
// Step 2
|
||||
for (const [address, mapEntry] of this.addressMap.entries()) {
|
||||
for (const [address, mapEntry] of this.entryMap.entries()) {
|
||||
// Step 2.i
|
||||
if (
|
||||
this.getCurrentEjectionPercent() >=
|
||||
|
@ -746,7 +820,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
private switchAllBuckets() {
|
||||
for (const mapEntry of this.addressMap.values()) {
|
||||
for (const mapEntry of this.entryMap.values()) {
|
||||
mapEntry.counter.switchBuckets();
|
||||
}
|
||||
}
|
||||
|
@ -771,7 +845,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
this.runSuccessRateCheck(ejectionTimestamp);
|
||||
this.runFailurePercentageCheck(ejectionTimestamp);
|
||||
|
||||
for (const [address, mapEntry] of this.addressMap.entries()) {
|
||||
for (const [address, mapEntry] of this.entryMap.entries()) {
|
||||
if (mapEntry.currentEjectionTimestamp === null) {
|
||||
if (mapEntry.ejectionTimeMultiplier > 0) {
|
||||
mapEntry.ejectionTimeMultiplier -= 1;
|
||||
|
@ -798,21 +872,17 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig,
|
||||
attributes: { [key: string]: unknown }
|
||||
): void {
|
||||
if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) {
|
||||
return;
|
||||
}
|
||||
const subchannelAddresses = new Set<string>();
|
||||
for (const address of addressList) {
|
||||
subchannelAddresses.add(subchannelAddressToString(address));
|
||||
}
|
||||
for (const address of subchannelAddresses) {
|
||||
if (!this.addressMap.has(address)) {
|
||||
trace('Adding map entry for ' + address);
|
||||
this.addressMap.set(address, {
|
||||
for (const endpoint of endpointList) {
|
||||
if (!this.entryMap.has(endpoint)) {
|
||||
trace('Adding map entry for ' + endpointToString(endpoint));
|
||||
this.entryMap.set(endpoint, {
|
||||
counter: new CallCounter(),
|
||||
currentEjectionTimestamp: null,
|
||||
ejectionTimeMultiplier: 0,
|
||||
|
@ -820,14 +890,9 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
});
|
||||
}
|
||||
}
|
||||
for (const key of this.addressMap.keys()) {
|
||||
if (!subchannelAddresses.has(key)) {
|
||||
trace('Removing map entry for ' + key);
|
||||
this.addressMap.delete(key);
|
||||
}
|
||||
}
|
||||
this.entryMap.deleteMissing(endpointList);
|
||||
const childPolicy = lbConfig.getChildPolicy();
|
||||
this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
|
||||
this.childBalancer.updateAddressList(endpointList, childPolicy, attributes);
|
||||
|
||||
if (
|
||||
lbConfig.getSuccessRateEjectionConfig() ||
|
||||
|
@ -850,7 +915,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
|
|||
trace('Counting disabled. Cancelling timer.');
|
||||
this.timerStartTime = null;
|
||||
clearTimeout(this.ejectionTimer);
|
||||
for (const mapEntry of this.addressMap.values()) {
|
||||
for (const mapEntry of this.entryMap.values()) {
|
||||
this.uneject(mapEntry);
|
||||
mapEntry.ejectionTimeMultiplier = 0;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import {
|
|||
TypedLoadBalancingConfig,
|
||||
registerDefaultLoadBalancerType,
|
||||
registerLoadBalancerType,
|
||||
createChildChannelControlHelper,
|
||||
} from './load-balancer';
|
||||
import { ConnectivityState } from './connectivity-state';
|
||||
import {
|
||||
|
@ -31,13 +32,16 @@ import {
|
|||
PickResultType,
|
||||
UnavailablePicker,
|
||||
} from './picker';
|
||||
import { SubchannelAddress } from './subchannel-address';
|
||||
import { Endpoint, SubchannelAddress } from './subchannel-address';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
import {
|
||||
SubchannelInterface,
|
||||
ConnectivityStateListener,
|
||||
HealthListener,
|
||||
} from './subchannel-interface';
|
||||
import { isTcpSubchannelAddress } from './subchannel-address';
|
||||
import { isIPv6 } from 'net';
|
||||
|
||||
const TRACER_NAME = 'pick_first';
|
||||
|
||||
|
@ -125,6 +129,39 @@ export function shuffled<T>(list: T[]): T[] {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Interleave addresses in addressList by family in accordance with RFC-8304 section 4
|
||||
* @param addressList
|
||||
* @returns
|
||||
*/
|
||||
function interleaveAddressFamilies(
|
||||
addressList: SubchannelAddress[]
|
||||
): SubchannelAddress[] {
|
||||
const result: SubchannelAddress[] = [];
|
||||
const ipv6Addresses: SubchannelAddress[] = [];
|
||||
const ipv4Addresses: SubchannelAddress[] = [];
|
||||
const ipv6First =
|
||||
isTcpSubchannelAddress(addressList[0]) && isIPv6(addressList[0].host);
|
||||
for (const address of addressList) {
|
||||
if (isTcpSubchannelAddress(address) && isIPv6(address.host)) {
|
||||
ipv6Addresses.push(address);
|
||||
} else {
|
||||
ipv4Addresses.push(address);
|
||||
}
|
||||
}
|
||||
const firstList = ipv6First ? ipv6Addresses : ipv4Addresses;
|
||||
const secondList = ipv6First ? ipv4Addresses : ipv6Addresses;
|
||||
for (let i = 0; i < Math.max(firstList.length, secondList.length); i++) {
|
||||
if (i < firstList.length) {
|
||||
result.push(firstList[i]);
|
||||
}
|
||||
if (i < secondList.length) {
|
||||
result.push(secondList[i]);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
export class PickFirstLoadBalancer implements LoadBalancer {
|
||||
/**
|
||||
* The list of subchannels this load balancer is currently attempting to
|
||||
|
@ -157,6 +194,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
) => {
|
||||
this.onSubchannelStateUpdate(subchannel, previousState, newState);
|
||||
};
|
||||
|
||||
private pickedSubchannelHealthListener: HealthListener = () =>
|
||||
this.calculateAndReportNewState();
|
||||
/**
|
||||
* Timer reference for the timer tracking when to start
|
||||
*/
|
||||
|
@ -179,7 +219,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
* @param channelControlHelper `ChannelControlHelper` instance provided by
|
||||
* this load balancer's owner.
|
||||
*/
|
||||
constructor(private readonly channelControlHelper: ChannelControlHelper) {
|
||||
constructor(
|
||||
private readonly channelControlHelper: ChannelControlHelper,
|
||||
private reportHealthStatus = false
|
||||
) {
|
||||
this.connectionDelayTimeout = setTimeout(() => {}, 0);
|
||||
clearTimeout(this.connectionDelayTimeout);
|
||||
}
|
||||
|
@ -190,10 +233,19 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
|
||||
private calculateAndReportNewState() {
|
||||
if (this.currentPick) {
|
||||
this.updateState(
|
||||
ConnectivityState.READY,
|
||||
new PickFirstPicker(this.currentPick)
|
||||
);
|
||||
if (this.reportHealthStatus && !this.currentPick.isHealthy()) {
|
||||
this.updateState(
|
||||
ConnectivityState.TRANSIENT_FAILURE,
|
||||
new UnavailablePicker({
|
||||
details: `Picked subchannel ${this.currentPick.getAddress()} is unhealthy`,
|
||||
})
|
||||
);
|
||||
} else {
|
||||
this.updateState(
|
||||
ConnectivityState.READY,
|
||||
new PickFirstPicker(this.currentPick)
|
||||
);
|
||||
}
|
||||
} else if (this.children.length === 0) {
|
||||
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
||||
} else {
|
||||
|
@ -235,6 +287,11 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
this.channelControlHelper.removeChannelzChild(
|
||||
currentPick.getChannelzRef()
|
||||
);
|
||||
if (this.reportHealthStatus) {
|
||||
currentPick.removeHealthStateWatcher(
|
||||
this.pickedSubchannelHealthListener
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -306,7 +363,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
this.children[subchannelIndex].subchannel.getAddress()
|
||||
);
|
||||
process.nextTick(() => {
|
||||
this.children[subchannelIndex].subchannel.startConnecting();
|
||||
this.children[subchannelIndex]?.subchannel.startConnecting();
|
||||
});
|
||||
}
|
||||
this.connectionDelayTimeout = setTimeout(() => {
|
||||
|
@ -320,17 +377,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
trace('Pick subchannel with address ' + subchannel.getAddress());
|
||||
this.stickyTransientFailureMode = false;
|
||||
if (this.currentPick !== null) {
|
||||
this.currentPick.unref();
|
||||
this.channelControlHelper.removeChannelzChild(
|
||||
this.currentPick.getChannelzRef()
|
||||
);
|
||||
this.currentPick.removeConnectivityStateListener(
|
||||
this.subchannelStateListener
|
||||
);
|
||||
}
|
||||
this.removeCurrentPick();
|
||||
this.currentPick = subchannel;
|
||||
subchannel.ref();
|
||||
if (this.reportHealthStatus) {
|
||||
subchannel.addHealthStateWatcher(this.pickedSubchannelHealthListener);
|
||||
}
|
||||
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
|
||||
this.resetSubchannelList();
|
||||
clearTimeout(this.connectionDelayTimeout);
|
||||
|
@ -373,7 +425,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig
|
||||
): void {
|
||||
if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {
|
||||
|
@ -383,8 +435,15 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
* previous update, to minimize churn. Now the DNS resolver is
|
||||
* rate-limited, so that is less of a concern. */
|
||||
if (lbConfig.getShuffleAddressList()) {
|
||||
addressList = shuffled(addressList);
|
||||
endpointList = shuffled(endpointList);
|
||||
}
|
||||
const rawAddressList = ([] as SubchannelAddress[]).concat(
|
||||
...endpointList.map(endpoint => endpoint.addresses)
|
||||
);
|
||||
if (rawAddressList.length === 0) {
|
||||
throw new Error('No addresses in endpoint list passed to pick_first');
|
||||
}
|
||||
const addressList = interleaveAddressFamilies(rawAddressList);
|
||||
const newChildrenList = addressList.map(address => ({
|
||||
subchannel: this.channelControlHelper.createSubchannel(address, {}),
|
||||
hasReportedTransientFailure: false,
|
||||
|
@ -438,6 +497,59 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
|
||||
|
||||
/**
|
||||
* This class handles the leaf load balancing operations for a single endpoint.
|
||||
* It is a thin wrapper around a PickFirstLoadBalancer with a different API
|
||||
* that more closely reflects how it will be used as a leaf balancer.
|
||||
*/
|
||||
export class LeafLoadBalancer {
|
||||
private pickFirstBalancer: PickFirstLoadBalancer;
|
||||
private latestState: ConnectivityState = ConnectivityState.IDLE;
|
||||
private latestPicker: Picker;
|
||||
constructor(
|
||||
private endpoint: Endpoint,
|
||||
channelControlHelper: ChannelControlHelper
|
||||
) {
|
||||
const childChannelControlHelper = createChildChannelControlHelper(
|
||||
channelControlHelper,
|
||||
{
|
||||
updateState: (connectivityState, picker) => {
|
||||
this.latestState = connectivityState;
|
||||
this.latestPicker = picker;
|
||||
channelControlHelper.updateState(connectivityState, picker);
|
||||
},
|
||||
}
|
||||
);
|
||||
this.pickFirstBalancer = new PickFirstLoadBalancer(
|
||||
childChannelControlHelper,
|
||||
/* reportHealthStatus= */ true
|
||||
);
|
||||
this.latestPicker = new QueuePicker(this.pickFirstBalancer);
|
||||
}
|
||||
|
||||
startConnecting() {
|
||||
this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG);
|
||||
}
|
||||
|
||||
getConnectivityState() {
|
||||
return this.latestState;
|
||||
}
|
||||
|
||||
getPicker() {
|
||||
return this.latestPicker;
|
||||
}
|
||||
|
||||
getEndpoint() {
|
||||
return this.endpoint;
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.pickFirstBalancer.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
export function setup(): void {
|
||||
registerLoadBalancerType(
|
||||
TYPE_NAME,
|
||||
|
|
|
@ -20,26 +20,24 @@ import {
|
|||
ChannelControlHelper,
|
||||
TypedLoadBalancingConfig,
|
||||
registerLoadBalancerType,
|
||||
createChildChannelControlHelper,
|
||||
} from './load-balancer';
|
||||
import { ConnectivityState } from './connectivity-state';
|
||||
import {
|
||||
QueuePicker,
|
||||
Picker,
|
||||
PickArgs,
|
||||
CompletePickResult,
|
||||
PickResultType,
|
||||
UnavailablePicker,
|
||||
PickResult,
|
||||
} from './picker';
|
||||
import {
|
||||
SubchannelAddress,
|
||||
subchannelAddressToString,
|
||||
} from './subchannel-address';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
import {
|
||||
ConnectivityStateListener,
|
||||
SubchannelInterface,
|
||||
} from './subchannel-interface';
|
||||
Endpoint,
|
||||
endpointEqual,
|
||||
endpointToString,
|
||||
} from './subchannel-address';
|
||||
import { LeafLoadBalancer } from './load-balancer-pick-first';
|
||||
|
||||
const TRACER_NAME = 'round_robin';
|
||||
|
||||
|
@ -70,20 +68,14 @@ class RoundRobinLoadBalancingConfig implements TypedLoadBalancingConfig {
|
|||
|
||||
class RoundRobinPicker implements Picker {
|
||||
constructor(
|
||||
private readonly subchannelList: SubchannelInterface[],
|
||||
private readonly children: { endpoint: Endpoint; picker: Picker }[],
|
||||
private nextIndex = 0
|
||||
) {}
|
||||
|
||||
pick(pickArgs: PickArgs): CompletePickResult {
|
||||
const pickedSubchannel = this.subchannelList[this.nextIndex];
|
||||
this.nextIndex = (this.nextIndex + 1) % this.subchannelList.length;
|
||||
return {
|
||||
pickResultType: PickResultType.COMPLETE,
|
||||
subchannel: pickedSubchannel,
|
||||
status: null,
|
||||
onCallStarted: null,
|
||||
onCallEnded: null,
|
||||
};
|
||||
pick(pickArgs: PickArgs): PickResult {
|
||||
const childPicker = this.children[this.nextIndex].picker;
|
||||
this.nextIndex = (this.nextIndex + 1) % this.children.length;
|
||||
return childPicker.pick(pickArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -91,54 +83,51 @@ class RoundRobinPicker implements Picker {
|
|||
* balancer implementation to preserve this part of the picker state if
|
||||
* possible when a subchannel connects or disconnects.
|
||||
*/
|
||||
peekNextSubchannel(): SubchannelInterface {
|
||||
return this.subchannelList[this.nextIndex];
|
||||
peekNextEndpoint(): Endpoint {
|
||||
return this.children[this.nextIndex].endpoint;
|
||||
}
|
||||
}
|
||||
|
||||
export class RoundRobinLoadBalancer implements LoadBalancer {
|
||||
private subchannels: SubchannelInterface[] = [];
|
||||
private children: LeafLoadBalancer[] = [];
|
||||
|
||||
private currentState: ConnectivityState = ConnectivityState.IDLE;
|
||||
|
||||
private subchannelStateListener: ConnectivityStateListener;
|
||||
|
||||
private currentReadyPicker: RoundRobinPicker | null = null;
|
||||
|
||||
constructor(private readonly channelControlHelper: ChannelControlHelper) {
|
||||
this.subchannelStateListener = (
|
||||
subchannel: SubchannelInterface,
|
||||
previousState: ConnectivityState,
|
||||
newState: ConnectivityState
|
||||
) => {
|
||||
this.calculateAndUpdateState();
|
||||
private updatesPaused = false;
|
||||
|
||||
if (
|
||||
newState === ConnectivityState.TRANSIENT_FAILURE ||
|
||||
newState === ConnectivityState.IDLE
|
||||
) {
|
||||
this.channelControlHelper.requestReresolution();
|
||||
subchannel.startConnecting();
|
||||
private childChannelControlHelper: ChannelControlHelper;
|
||||
|
||||
constructor(private readonly channelControlHelper: ChannelControlHelper) {
|
||||
this.childChannelControlHelper = createChildChannelControlHelper(
|
||||
channelControlHelper,
|
||||
{
|
||||
updateState: (connectivityState, picker) => {
|
||||
this.calculateAndUpdateState();
|
||||
},
|
||||
}
|
||||
};
|
||||
);
|
||||
}
|
||||
|
||||
private countSubchannelsWithState(state: ConnectivityState) {
|
||||
return this.subchannels.filter(
|
||||
subchannel => subchannel.getConnectivityState() === state
|
||||
).length;
|
||||
private countChildrenWithState(state: ConnectivityState) {
|
||||
return this.children.filter(child => child.getConnectivityState() === state)
|
||||
.length;
|
||||
}
|
||||
|
||||
private calculateAndUpdateState() {
|
||||
if (this.countSubchannelsWithState(ConnectivityState.READY) > 0) {
|
||||
const readySubchannels = this.subchannels.filter(
|
||||
subchannel =>
|
||||
subchannel.getConnectivityState() === ConnectivityState.READY
|
||||
if (this.updatesPaused) {
|
||||
return;
|
||||
}
|
||||
if (this.countChildrenWithState(ConnectivityState.READY) > 0) {
|
||||
const readyChildren = this.children.filter(
|
||||
child => child.getConnectivityState() === ConnectivityState.READY
|
||||
);
|
||||
let index = 0;
|
||||
if (this.currentReadyPicker !== null) {
|
||||
index = readySubchannels.indexOf(
|
||||
this.currentReadyPicker.peekNextSubchannel()
|
||||
const nextPickedEndpoint = this.currentReadyPicker.peekNextEndpoint();
|
||||
index = readyChildren.findIndex(child =>
|
||||
endpointEqual(child.getEndpoint(), nextPickedEndpoint)
|
||||
);
|
||||
if (index < 0) {
|
||||
index = 0;
|
||||
|
@ -146,14 +135,18 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
this.updateState(
|
||||
ConnectivityState.READY,
|
||||
new RoundRobinPicker(readySubchannels, index)
|
||||
new RoundRobinPicker(
|
||||
readyChildren.map(child => ({
|
||||
endpoint: child.getEndpoint(),
|
||||
picker: child.getPicker(),
|
||||
})),
|
||||
index
|
||||
)
|
||||
);
|
||||
} else if (
|
||||
this.countSubchannelsWithState(ConnectivityState.CONNECTING) > 0
|
||||
) {
|
||||
} else if (this.countChildrenWithState(ConnectivityState.CONNECTING) > 0) {
|
||||
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
|
||||
} else if (
|
||||
this.countSubchannelsWithState(ConnectivityState.TRANSIENT_FAILURE) > 0
|
||||
this.countChildrenWithState(ConnectivityState.TRANSIENT_FAILURE) > 0
|
||||
) {
|
||||
this.updateState(
|
||||
ConnectivityState.TRANSIENT_FAILURE,
|
||||
|
@ -180,51 +173,35 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
private resetSubchannelList() {
|
||||
for (const subchannel of this.subchannels) {
|
||||
subchannel.removeConnectivityStateListener(this.subchannelStateListener);
|
||||
subchannel.unref();
|
||||
this.channelControlHelper.removeChannelzChild(
|
||||
subchannel.getChannelzRef()
|
||||
);
|
||||
for (const child of this.children) {
|
||||
child.destroy();
|
||||
}
|
||||
this.subchannels = [];
|
||||
}
|
||||
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig
|
||||
): void {
|
||||
this.resetSubchannelList();
|
||||
trace(
|
||||
'Connect to address list ' +
|
||||
addressList.map(address => subchannelAddressToString(address))
|
||||
trace('Connect to endpoint list ' + endpointList.map(endpointToString));
|
||||
this.updatesPaused = true;
|
||||
this.children = endpointList.map(
|
||||
endpoint => new LeafLoadBalancer(endpoint, this.childChannelControlHelper)
|
||||
);
|
||||
this.subchannels = addressList.map(address =>
|
||||
this.channelControlHelper.createSubchannel(address, {})
|
||||
);
|
||||
for (const subchannel of this.subchannels) {
|
||||
subchannel.ref();
|
||||
subchannel.addConnectivityStateListener(this.subchannelStateListener);
|
||||
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
|
||||
const subchannelState = subchannel.getConnectivityState();
|
||||
if (
|
||||
subchannelState === ConnectivityState.IDLE ||
|
||||
subchannelState === ConnectivityState.TRANSIENT_FAILURE
|
||||
) {
|
||||
subchannel.startConnecting();
|
||||
}
|
||||
for (const child of this.children) {
|
||||
child.startConnecting();
|
||||
}
|
||||
this.updatesPaused = false;
|
||||
this.calculateAndUpdateState();
|
||||
}
|
||||
|
||||
exitIdle(): void {
|
||||
for (const subchannel of this.subchannels) {
|
||||
subchannel.startConnecting();
|
||||
}
|
||||
/* The round_robin LB policy is only in the IDLE state if it has no
|
||||
* addresses to try to connect to and it has no picked subchannel.
|
||||
* In that case, there is no meaningful action that can be taken here. */
|
||||
}
|
||||
resetBackoff(): void {
|
||||
/* The pick first load balancer does not have a connection backoff, so this
|
||||
* does nothing */
|
||||
// This LB policy has no backoff to reset
|
||||
}
|
||||
destroy(): void {
|
||||
this.resetSubchannelList();
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
|
||||
import { ChannelOptions } from './channel-options';
|
||||
import { SubchannelAddress } from './subchannel-address';
|
||||
import { Endpoint, SubchannelAddress } from './subchannel-address';
|
||||
import { ConnectivityState } from './connectivity-state';
|
||||
import { Picker } from './picker';
|
||||
import { ChannelRef, SubchannelRef } from './channelz';
|
||||
|
@ -95,12 +95,12 @@ export interface LoadBalancer {
|
|||
* The load balancer will start establishing connections with the new list,
|
||||
* but will continue using any existing connections until the new connections
|
||||
* are established
|
||||
* @param addressList The new list of addresses to connect to
|
||||
* @param endpointList The new list of addresses to connect to
|
||||
* @param lbConfig The load balancing config object from the service config,
|
||||
* if one was provided
|
||||
*/
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig,
|
||||
attributes: { [key: string]: unknown }
|
||||
): void;
|
||||
|
@ -185,7 +185,9 @@ export function isLoadBalancerNameRegistered(typeName: string): boolean {
|
|||
return typeName in registeredLoadBalancerTypes;
|
||||
}
|
||||
|
||||
export function parseLoadBalancingConfig(rawConfig: LoadBalancingConfig): TypedLoadBalancingConfig {
|
||||
export function parseLoadBalancingConfig(
|
||||
rawConfig: LoadBalancingConfig
|
||||
): TypedLoadBalancingConfig {
|
||||
const keys = Object.keys(rawConfig);
|
||||
if (keys.length !== 1) {
|
||||
throw new Error(
|
||||
|
@ -210,7 +212,9 @@ export function getDefaultConfig() {
|
|||
if (!defaultLoadBalancerType) {
|
||||
throw new Error('No default load balancer type registered');
|
||||
}
|
||||
return new registeredLoadBalancerTypes[defaultLoadBalancerType]!.LoadBalancingConfig();
|
||||
return new registeredLoadBalancerTypes[
|
||||
defaultLoadBalancerType
|
||||
]!.LoadBalancingConfig();
|
||||
}
|
||||
|
||||
export function selectLbConfigFromList(
|
||||
|
@ -221,7 +225,11 @@ export function selectLbConfigFromList(
|
|||
try {
|
||||
return parseLoadBalancingConfig(config);
|
||||
} catch (e) {
|
||||
log(LogVerbosity.DEBUG, 'Config parsing failed with error', (e as Error).message);
|
||||
log(
|
||||
LogVerbosity.DEBUG,
|
||||
'Config parsing failed with error',
|
||||
(e as Error).message
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,16 +97,13 @@ export interface Picker {
|
|||
*/
|
||||
export class UnavailablePicker implements Picker {
|
||||
private status: StatusObject;
|
||||
constructor(status?: StatusObject) {
|
||||
if (status !== undefined) {
|
||||
this.status = status;
|
||||
} else {
|
||||
this.status = {
|
||||
code: Status.UNAVAILABLE,
|
||||
details: 'No connection established',
|
||||
metadata: new Metadata(),
|
||||
};
|
||||
}
|
||||
constructor(status?: Partial<StatusObject>) {
|
||||
this.status = {
|
||||
code: Status.UNAVAILABLE,
|
||||
details: 'No connection established',
|
||||
metadata: new Metadata(),
|
||||
...status,
|
||||
};
|
||||
}
|
||||
pick(pickArgs: PickArgs): TransientFailurePickResult {
|
||||
return {
|
||||
|
|
|
@ -28,7 +28,7 @@ import { StatusObject } from './call-interface';
|
|||
import { Metadata } from './metadata';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
import { SubchannelAddress, TcpSubchannelAddress } from './subchannel-address';
|
||||
import { Endpoint, TcpSubchannelAddress } from './subchannel-address';
|
||||
import { GrpcUri, uriToString, splitHostPort } from './uri-parser';
|
||||
import { isIPv6, isIPv4 } from 'net';
|
||||
import { ChannelOptions } from './channel-options';
|
||||
|
@ -50,35 +50,11 @@ const DEFAULT_MIN_TIME_BETWEEN_RESOLUTIONS_MS = 30_000;
|
|||
const resolveTxtPromise = util.promisify(dns.resolveTxt);
|
||||
const dnsLookupPromise = util.promisify(dns.lookup);
|
||||
|
||||
/**
|
||||
* Merge any number of arrays into a single alternating array
|
||||
* @param arrays
|
||||
*/
|
||||
function mergeArrays<T>(...arrays: T[][]): T[] {
|
||||
const result: T[] = [];
|
||||
for (
|
||||
let i = 0;
|
||||
i <
|
||||
Math.max.apply(
|
||||
null,
|
||||
arrays.map(array => array.length)
|
||||
);
|
||||
i++
|
||||
) {
|
||||
for (const array of arrays) {
|
||||
if (i < array.length) {
|
||||
result.push(array[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolver implementation that handles DNS names and IP addresses.
|
||||
*/
|
||||
class DnsResolver implements Resolver {
|
||||
private readonly ipResult: SubchannelAddress[] | null;
|
||||
private readonly ipResult: Endpoint[] | null;
|
||||
private readonly dnsHostname: string | null;
|
||||
private readonly port: number | null;
|
||||
/**
|
||||
|
@ -89,7 +65,7 @@ class DnsResolver implements Resolver {
|
|||
private readonly minTimeBetweenResolutionsMs: number;
|
||||
private pendingLookupPromise: Promise<dns.LookupAddress[]> | null = null;
|
||||
private pendingTxtPromise: Promise<string[][]> | null = null;
|
||||
private latestLookupResult: TcpSubchannelAddress[] | null = null;
|
||||
private latestLookupResult: Endpoint[] | null = null;
|
||||
private latestServiceConfig: ServiceConfig | null = null;
|
||||
private latestServiceConfigError: StatusObject | null = null;
|
||||
private percentage: number;
|
||||
|
@ -114,8 +90,12 @@ class DnsResolver implements Resolver {
|
|||
if (isIPv4(hostPort.host) || isIPv6(hostPort.host)) {
|
||||
this.ipResult = [
|
||||
{
|
||||
host: hostPort.host,
|
||||
port: hostPort.port ?? DEFAULT_PORT,
|
||||
addresses: [
|
||||
{
|
||||
host: hostPort.host,
|
||||
port: hostPort.port ?? DEFAULT_PORT,
|
||||
},
|
||||
],
|
||||
},
|
||||
];
|
||||
this.dnsHostname = null;
|
||||
|
@ -213,18 +193,15 @@ class DnsResolver implements Resolver {
|
|||
this.pendingLookupPromise = null;
|
||||
this.backoff.reset();
|
||||
this.backoff.stop();
|
||||
const ip4Addresses: dns.LookupAddress[] = addressList.filter(
|
||||
addr => addr.family === 4
|
||||
);
|
||||
const ip6Addresses: dns.LookupAddress[] = addressList.filter(
|
||||
addr => addr.family === 6
|
||||
);
|
||||
this.latestLookupResult = mergeArrays(ip6Addresses, ip4Addresses).map(
|
||||
const subchannelAddresses: TcpSubchannelAddress[] = addressList.map(
|
||||
addr => ({ host: addr.address, port: +this.port! })
|
||||
);
|
||||
this.latestLookupResult = subchannelAddresses.map(address => ({
|
||||
addresses: [address],
|
||||
}));
|
||||
const allAddressesString: string =
|
||||
'[' +
|
||||
this.latestLookupResult
|
||||
subchannelAddresses
|
||||
.map(addr => addr.host + ':' + addr.port)
|
||||
.join(',') +
|
||||
']';
|
||||
|
|
|
@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options';
|
|||
import { LogVerbosity, Status } from './constants';
|
||||
import { Metadata } from './metadata';
|
||||
import { registerResolver, Resolver, ResolverListener } from './resolver';
|
||||
import { SubchannelAddress } from './subchannel-address';
|
||||
import { Endpoint, SubchannelAddress } from './subchannel-address';
|
||||
import { GrpcUri, splitHostPort, uriToString } from './uri-parser';
|
||||
import * as logging from './logging';
|
||||
|
||||
|
@ -39,7 +39,7 @@ const IPV6_SCHEME = 'ipv6';
|
|||
const DEFAULT_PORT = 443;
|
||||
|
||||
class IpResolver implements Resolver {
|
||||
private addresses: SubchannelAddress[] = [];
|
||||
private endpoints: Endpoint[] = [];
|
||||
private error: StatusObject | null = null;
|
||||
constructor(
|
||||
target: GrpcUri,
|
||||
|
@ -83,8 +83,8 @@ class IpResolver implements Resolver {
|
|||
port: hostPort.port ?? DEFAULT_PORT,
|
||||
});
|
||||
}
|
||||
this.addresses = addresses;
|
||||
trace('Parsed ' + target.scheme + ' address list ' + this.addresses);
|
||||
this.endpoints = addresses.map(address => ({ addresses: [address] }));
|
||||
trace('Parsed ' + target.scheme + ' address list ' + addresses);
|
||||
}
|
||||
updateResolution(): void {
|
||||
process.nextTick(() => {
|
||||
|
@ -92,7 +92,7 @@ class IpResolver implements Resolver {
|
|||
this.listener.onError(this.error);
|
||||
} else {
|
||||
this.listener.onSuccessfulResolution(
|
||||
this.addresses,
|
||||
this.endpoints,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -15,12 +15,12 @@
|
|||
*/
|
||||
|
||||
import { Resolver, ResolverListener, registerResolver } from './resolver';
|
||||
import { SubchannelAddress } from './subchannel-address';
|
||||
import { Endpoint } from './subchannel-address';
|
||||
import { GrpcUri } from './uri-parser';
|
||||
import { ChannelOptions } from './channel-options';
|
||||
|
||||
class UdsResolver implements Resolver {
|
||||
private addresses: SubchannelAddress[] = [];
|
||||
private endpoints: Endpoint[] = [];
|
||||
constructor(
|
||||
target: GrpcUri,
|
||||
private listener: ResolverListener,
|
||||
|
@ -32,12 +32,12 @@ class UdsResolver implements Resolver {
|
|||
} else {
|
||||
path = target.path;
|
||||
}
|
||||
this.addresses = [{ path }];
|
||||
this.endpoints = [{ addresses: [{ path }] }];
|
||||
}
|
||||
updateResolution(): void {
|
||||
process.nextTick(
|
||||
this.listener.onSuccessfulResolution,
|
||||
this.addresses,
|
||||
this.endpoints,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
import { MethodConfig, ServiceConfig } from './service-config';
|
||||
import { StatusObject } from './call-interface';
|
||||
import { SubchannelAddress } from './subchannel-address';
|
||||
import { Endpoint } from './subchannel-address';
|
||||
import { GrpcUri, uriToString } from './uri-parser';
|
||||
import { ChannelOptions } from './channel-options';
|
||||
import { Metadata } from './metadata';
|
||||
|
@ -55,7 +55,7 @@ export interface ResolverListener {
|
|||
* service configuration was invalid
|
||||
*/
|
||||
onSuccessfulResolution(
|
||||
addressList: SubchannelAddress[],
|
||||
addressList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null,
|
||||
configSelector: ConfigSelector | null,
|
||||
|
|
|
@ -32,7 +32,7 @@ import { StatusObject } from './call-interface';
|
|||
import { Metadata } from './metadata';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
import { SubchannelAddress } from './subchannel-address';
|
||||
import { Endpoint } from './subchannel-address';
|
||||
import { GrpcUri, uriToString } from './uri-parser';
|
||||
import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
|
||||
import { ChannelOptions } from './channel-options';
|
||||
|
@ -177,7 +177,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
|||
target,
|
||||
{
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: ServiceError | null,
|
||||
configSelector: ConfigSelector | null,
|
||||
|
@ -226,7 +226,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
|||
return;
|
||||
}
|
||||
this.childLoadBalancer.updateAddressList(
|
||||
addressList,
|
||||
endpointList,
|
||||
loadBalancingConfig,
|
||||
attributes
|
||||
);
|
||||
|
@ -307,7 +307,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
updateAddressList(
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
lbConfig: TypedLoadBalancingConfig | null
|
||||
): never {
|
||||
throw new Error('updateAddressList not supported on ResolvingLoadBalancer');
|
||||
|
|
|
@ -631,12 +631,15 @@ export class Server {
|
|||
|
||||
const resolverListener: ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList,
|
||||
endpointList,
|
||||
serviceConfig,
|
||||
serviceConfigError
|
||||
) => {
|
||||
// We only want one resolution result. Discard all future results
|
||||
resolverListener.onSuccessfulResolution = () => {};
|
||||
const addressList = ([] as SubchannelAddress[]).concat(
|
||||
...endpointList.map(endpoint => endpoint.addresses)
|
||||
);
|
||||
if (addressList.length === 0) {
|
||||
deferredCallback(
|
||||
new Error(`No addresses resolved for port ${port}`),
|
||||
|
|
|
@ -86,3 +86,39 @@ export function stringToSubchannelAddress(
|
|||
};
|
||||
}
|
||||
}
|
||||
|
||||
export interface Endpoint {
|
||||
addresses: SubchannelAddress[];
|
||||
}
|
||||
|
||||
export function endpointEqual(endpoint1: Endpoint, endpoint2: Endpoint) {
|
||||
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
|
||||
return false;
|
||||
}
|
||||
for (let i = 0; i < endpoint1.addresses.length; i++) {
|
||||
if (
|
||||
!subchannelAddressEqual(endpoint1.addresses[i], endpoint2.addresses[i])
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
export function endpointToString(endpoint: Endpoint): string {
|
||||
return (
|
||||
'[' + endpoint.addresses.map(subchannelAddressToString).join(', ') + ']'
|
||||
);
|
||||
}
|
||||
|
||||
export function endpointHasAddress(
|
||||
endpoint: Endpoint,
|
||||
expectedAddress: SubchannelAddress
|
||||
): boolean {
|
||||
for (const address of endpoint.addresses) {
|
||||
if (subchannelAddressEqual(address, expectedAddress)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ export type ConnectivityStateListener = (
|
|||
keepaliveTime: number
|
||||
) => void;
|
||||
|
||||
export type HealthListener = (healthy: boolean) => void;
|
||||
|
||||
/**
|
||||
* This is an interface for load balancing policies to use to interact with
|
||||
* subchannels. This allows load balancing policies to wrap and unwrap
|
||||
|
@ -45,6 +47,9 @@ export interface SubchannelInterface {
|
|||
ref(): void;
|
||||
unref(): void;
|
||||
getChannelzRef(): SubchannelRef;
|
||||
isHealthy(): boolean;
|
||||
addHealthStateWatcher(listener: HealthListener): void;
|
||||
removeHealthStateWatcher(listener: HealthListener): void;
|
||||
/**
|
||||
* If this is a wrapper, return the wrapped subchannel, otherwise return this
|
||||
*/
|
||||
|
@ -58,7 +63,23 @@ export interface SubchannelInterface {
|
|||
}
|
||||
|
||||
export abstract class BaseSubchannelWrapper implements SubchannelInterface {
|
||||
constructor(protected child: SubchannelInterface) {}
|
||||
private healthy = true;
|
||||
private healthListeners: Set<HealthListener> = new Set();
|
||||
constructor(protected child: SubchannelInterface) {
|
||||
child.addHealthStateWatcher(childHealthy => {
|
||||
/* A change to the child health state only affects this wrapper's overall
|
||||
* health state if this wrapper is reporting healthy. */
|
||||
if (this.healthy) {
|
||||
this.updateHealthListeners();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private updateHealthListeners(): void {
|
||||
for (const listener of this.healthListeners) {
|
||||
listener(this.isHealthy());
|
||||
}
|
||||
}
|
||||
|
||||
getConnectivityState(): ConnectivityState {
|
||||
return this.child.getConnectivityState();
|
||||
|
@ -87,6 +108,25 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
|
|||
getChannelzRef(): SubchannelRef {
|
||||
return this.child.getChannelzRef();
|
||||
}
|
||||
isHealthy(): boolean {
|
||||
return this.healthy && this.child.isHealthy();
|
||||
}
|
||||
addHealthStateWatcher(listener: HealthListener): void {
|
||||
this.healthListeners.add(listener);
|
||||
}
|
||||
removeHealthStateWatcher(listener: HealthListener): void {
|
||||
this.healthListeners.delete(listener);
|
||||
}
|
||||
protected setHealthy(healthy: boolean): void {
|
||||
if (healthy !== this.healthy) {
|
||||
this.healthy = healthy;
|
||||
/* A change to this wrapper's health state only affects the overall
|
||||
* reported health state if the child is healthy. */
|
||||
if (this.child.isHealthy()) {
|
||||
this.updateHealthListeners();
|
||||
}
|
||||
}
|
||||
}
|
||||
getRealSubchannel(): Subchannel {
|
||||
return this.child.getRealSubchannel();
|
||||
}
|
||||
|
|
|
@ -461,6 +461,18 @@ export class Subchannel {
|
|||
return this.channelzRef;
|
||||
}
|
||||
|
||||
isHealthy(): boolean {
|
||||
return true;
|
||||
}
|
||||
|
||||
addHealthStateWatcher(listener: (healthy: boolean) => void): void {
|
||||
// Do nothing with the listener
|
||||
}
|
||||
|
||||
removeHealthStateWatcher(listener: (healthy: boolean) => void): void {
|
||||
// Do nothing with the listener
|
||||
}
|
||||
|
||||
getRealSubchannel(): this {
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -27,7 +27,10 @@ import {
|
|||
loadPackageDefinition,
|
||||
} from '../src/make-client';
|
||||
import { readFileSync } from 'fs';
|
||||
import { SubchannelInterface } from '../src/subchannel-interface';
|
||||
import {
|
||||
HealthListener,
|
||||
SubchannelInterface,
|
||||
} from '../src/subchannel-interface';
|
||||
import { SubchannelRef } from '../src/channelz';
|
||||
import { Subchannel } from '../src/subchannel';
|
||||
import { ConnectivityState } from '../src/connectivity-state';
|
||||
|
@ -198,6 +201,11 @@ export class MockSubchannel implements SubchannelInterface {
|
|||
realSubchannelEquals(other: grpc.experimental.SubchannelInterface): boolean {
|
||||
return this === other;
|
||||
}
|
||||
isHealthy(): boolean {
|
||||
return true;
|
||||
}
|
||||
addHealthStateWatcher(listener: HealthListener): void {}
|
||||
removeHealthStateWatcher(listener: HealthListener): void {}
|
||||
}
|
||||
|
||||
export { assert2 };
|
||||
|
|
|
@ -29,10 +29,7 @@ import {
|
|||
} from '../src/load-balancer-pick-first';
|
||||
import { Metadata } from '../src/metadata';
|
||||
import { Picker } from '../src/picker';
|
||||
import {
|
||||
SubchannelAddress,
|
||||
subchannelAddressToString,
|
||||
} from '../src/subchannel-address';
|
||||
import { Endpoint, subchannelAddressToString } from '../src/subchannel-address';
|
||||
import { MockSubchannel, TestClient, TestServer } from './common';
|
||||
|
||||
function updateStateCallBackForExpectedStateSequence(
|
||||
|
@ -120,11 +117,62 @@ describe('pick_first load balancing policy', () => {
|
|||
}
|
||||
);
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 1 }] }],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
subchannels[0].transitionToState(ConnectivityState.READY);
|
||||
});
|
||||
});
|
||||
it('Should report READY when a subchannel other than the first connects', done => {
|
||||
const channelControlHelper = createChildChannelControlHelper(
|
||||
baseChannelControlHelper,
|
||||
{
|
||||
updateState: updateStateCallBackForExpectedStateSequence(
|
||||
[ConnectivityState.CONNECTING, ConnectivityState.READY],
|
||||
done
|
||||
),
|
||||
}
|
||||
);
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{ addresses: [{ host: 'localhost', port: 1 }] },
|
||||
{ addresses: [{ host: 'localhost', port: 2 }] },
|
||||
],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
subchannels[1].transitionToState(ConnectivityState.READY);
|
||||
});
|
||||
});
|
||||
it('Should report READY when a subchannel other than the first in the same endpoint connects', done => {
|
||||
const channelControlHelper = createChildChannelControlHelper(
|
||||
baseChannelControlHelper,
|
||||
{
|
||||
updateState: updateStateCallBackForExpectedStateSequence(
|
||||
[ConnectivityState.CONNECTING, ConnectivityState.READY],
|
||||
done
|
||||
),
|
||||
}
|
||||
);
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{
|
||||
addresses: [
|
||||
{ host: 'localhost', port: 1 },
|
||||
{ host: 'localhost', port: 2 },
|
||||
],
|
||||
},
|
||||
],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
subchannels[1].transitionToState(ConnectivityState.READY);
|
||||
});
|
||||
});
|
||||
it('Should report READY when updated with a subchannel that is already READY', done => {
|
||||
const channelControlHelper = createChildChannelControlHelper(
|
||||
baseChannelControlHelper,
|
||||
|
@ -144,7 +192,10 @@ describe('pick_first load balancing policy', () => {
|
|||
}
|
||||
);
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 1 }] }],
|
||||
config
|
||||
);
|
||||
});
|
||||
it('Should stay CONNECTING if only some subchannels fail to connect', done => {
|
||||
const channelControlHelper = createChildChannelControlHelper(
|
||||
|
@ -159,8 +210,8 @@ describe('pick_first load balancing policy', () => {
|
|||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{ host: 'localhost', port: 1 },
|
||||
{ host: 'localhost', port: 2 },
|
||||
{ addresses: [{ host: 'localhost', port: 1 }] },
|
||||
{ addresses: [{ host: 'localhost', port: 2 }] },
|
||||
],
|
||||
config
|
||||
);
|
||||
|
@ -181,8 +232,8 @@ describe('pick_first load balancing policy', () => {
|
|||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{ host: 'localhost', port: 1 },
|
||||
{ host: 'localhost', port: 2 },
|
||||
{ addresses: [{ host: 'localhost', port: 1 }] },
|
||||
{ addresses: [{ host: 'localhost', port: 2 }] },
|
||||
],
|
||||
config
|
||||
);
|
||||
|
@ -206,8 +257,8 @@ describe('pick_first load balancing policy', () => {
|
|||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{ host: 'localhost', port: 1 },
|
||||
{ host: 'localhost', port: 2 },
|
||||
{ addresses: [{ host: 'localhost', port: 1 }] },
|
||||
{ addresses: [{ host: 'localhost', port: 2 }] },
|
||||
],
|
||||
config
|
||||
);
|
||||
|
@ -245,8 +296,8 @@ describe('pick_first load balancing policy', () => {
|
|||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{ host: 'localhost', port: 1 },
|
||||
{ host: 'localhost', port: 2 },
|
||||
{ addresses: [{ host: 'localhost', port: 1 }] },
|
||||
{ addresses: [{ host: 'localhost', port: 2 }] },
|
||||
],
|
||||
config
|
||||
);
|
||||
|
@ -272,8 +323,8 @@ describe('pick_first load balancing policy', () => {
|
|||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{ host: 'localhost', port: 1 },
|
||||
{ host: 'localhost', port: 2 },
|
||||
{ addresses: [{ host: 'localhost', port: 1 }] },
|
||||
{ addresses: [{ host: 'localhost', port: 2 }] },
|
||||
],
|
||||
config
|
||||
);
|
||||
|
@ -303,8 +354,8 @@ describe('pick_first load balancing policy', () => {
|
|||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{ host: 'localhost', port: 1 },
|
||||
{ host: 'localhost', port: 2 },
|
||||
{ addresses: [{ host: 'localhost', port: 1 }] },
|
||||
{ addresses: [{ host: 'localhost', port: 2 }] },
|
||||
],
|
||||
config
|
||||
);
|
||||
|
@ -312,8 +363,8 @@ describe('pick_first load balancing policy', () => {
|
|||
currentStartState = ConnectivityState.CONNECTING;
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{ host: 'localhost', port: 3 },
|
||||
{ host: 'localhost', port: 4 },
|
||||
{ addresses: [{ host: 'localhost', port: 1 }] },
|
||||
{ addresses: [{ host: 'localhost', port: 2 }] },
|
||||
],
|
||||
config
|
||||
);
|
||||
|
@ -341,14 +392,17 @@ describe('pick_first load balancing policy', () => {
|
|||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(
|
||||
[
|
||||
{ host: 'localhost', port: 1 },
|
||||
{ host: 'localhost', port: 2 },
|
||||
{ addresses: [{ host: 'localhost', port: 1 }] },
|
||||
{ addresses: [{ host: 'localhost', port: 2 }] },
|
||||
],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
currentStartState = ConnectivityState.READY;
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 3 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 3 }] }],
|
||||
config
|
||||
);
|
||||
});
|
||||
});
|
||||
it('Should transition from READY to IDLE if the connected subchannel disconnects', done => {
|
||||
|
@ -371,7 +425,10 @@ describe('pick_first load balancing policy', () => {
|
|||
}
|
||||
);
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 1 }] }],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
subchannels[0].transitionToState(ConnectivityState.IDLE);
|
||||
});
|
||||
|
@ -396,10 +453,16 @@ describe('pick_first load balancing policy', () => {
|
|||
}
|
||||
);
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 1 }] }],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
currentStartState = ConnectivityState.IDLE;
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 2 }] }],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
subchannels[0].transitionToState(ConnectivityState.IDLE);
|
||||
});
|
||||
|
@ -425,10 +488,16 @@ describe('pick_first load balancing policy', () => {
|
|||
}
|
||||
);
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 1 }] }],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
currentStartState = ConnectivityState.TRANSIENT_FAILURE;
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 2 }] }],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
subchannels[0].transitionToState(ConnectivityState.IDLE);
|
||||
});
|
||||
|
@ -454,9 +523,15 @@ describe('pick_first load balancing policy', () => {
|
|||
}
|
||||
);
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 1 }] }],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config);
|
||||
pickFirst.updateAddressList(
|
||||
[{ addresses: [{ host: 'localhost', port: 2 }] }],
|
||||
config
|
||||
);
|
||||
process.nextTick(() => {
|
||||
subchannels[0].transitionToState(ConnectivityState.IDLE);
|
||||
});
|
||||
|
@ -490,24 +565,24 @@ describe('pick_first load balancing policy', () => {
|
|||
},
|
||||
}
|
||||
);
|
||||
const addresses: SubchannelAddress[] = [];
|
||||
const endpoints: Endpoint[] = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
addresses.push({ host: 'localhost', port: i + 1 });
|
||||
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
|
||||
}
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
/* Pick from 10 subchannels 5 times, with address randomization enabled,
|
||||
* and verify that at least two different subchannels are picked. The
|
||||
* probability choosing the same address every time is 1/10,000, which
|
||||
* I am considering an acceptable flake rate */
|
||||
pickFirst.updateAddressList(addresses, shuffleConfig);
|
||||
pickFirst.updateAddressList(endpoints, shuffleConfig);
|
||||
process.nextTick(() => {
|
||||
pickFirst.updateAddressList(addresses, shuffleConfig);
|
||||
pickFirst.updateAddressList(endpoints, shuffleConfig);
|
||||
process.nextTick(() => {
|
||||
pickFirst.updateAddressList(addresses, shuffleConfig);
|
||||
pickFirst.updateAddressList(endpoints, shuffleConfig);
|
||||
process.nextTick(() => {
|
||||
pickFirst.updateAddressList(addresses, shuffleConfig);
|
||||
pickFirst.updateAddressList(endpoints, shuffleConfig);
|
||||
process.nextTick(() => {
|
||||
pickFirst.updateAddressList(addresses, shuffleConfig);
|
||||
pickFirst.updateAddressList(endpoints, shuffleConfig);
|
||||
process.nextTick(() => {
|
||||
assert(pickedSubchannels.size > 1);
|
||||
done();
|
||||
|
@ -546,20 +621,20 @@ describe('pick_first load balancing policy', () => {
|
|||
},
|
||||
}
|
||||
);
|
||||
const addresses: SubchannelAddress[] = [];
|
||||
const endpoints: Endpoint[] = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
addresses.push({ host: 'localhost', port: i + 1 });
|
||||
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
|
||||
}
|
||||
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
|
||||
pickFirst.updateAddressList(addresses, config);
|
||||
pickFirst.updateAddressList(endpoints, config);
|
||||
process.nextTick(() => {
|
||||
pickFirst.updateAddressList(addresses, config);
|
||||
pickFirst.updateAddressList(endpoints, config);
|
||||
process.nextTick(() => {
|
||||
pickFirst.updateAddressList(addresses, config);
|
||||
pickFirst.updateAddressList(endpoints, config);
|
||||
process.nextTick(() => {
|
||||
pickFirst.updateAddressList(addresses, config);
|
||||
pickFirst.updateAddressList(endpoints, config);
|
||||
process.nextTick(() => {
|
||||
pickFirst.updateAddressList(addresses, config);
|
||||
pickFirst.updateAddressList(endpoints, config);
|
||||
process.nextTick(() => {
|
||||
assert(pickedSubchannels.size === 1);
|
||||
done();
|
||||
|
|
|
@ -25,12 +25,27 @@ import * as resolver_ip from '../src/resolver-ip';
|
|||
import { ServiceConfig } from '../src/service-config';
|
||||
import { StatusObject } from '../src/call-interface';
|
||||
import {
|
||||
Endpoint,
|
||||
SubchannelAddress,
|
||||
isTcpSubchannelAddress,
|
||||
subchannelAddressToString,
|
||||
endpointToString,
|
||||
subchannelAddressEqual,
|
||||
} from '../src/subchannel-address';
|
||||
import { parseUri, GrpcUri } from '../src/uri-parser';
|
||||
|
||||
function hasMatchingAddress(
|
||||
endpointList: Endpoint[],
|
||||
expectedAddress: SubchannelAddress
|
||||
): boolean {
|
||||
for (const endpoint of endpointList) {
|
||||
for (const address of endpoint.addresses) {
|
||||
if (subchannelAddressEqual(address, expectedAddress)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
describe('Name Resolver', () => {
|
||||
before(() => {
|
||||
resolver_dns.setup();
|
||||
|
@ -46,27 +61,17 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 50051
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
|
||||
);
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 50051
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
|
||||
);
|
||||
done();
|
||||
},
|
||||
|
@ -83,28 +88,16 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
);
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
|
||||
);
|
||||
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
|
||||
done();
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
|
@ -118,19 +111,14 @@ describe('Name Resolver', () => {
|
|||
const target = resolverManager.mapUriDefaultScheme(parseUri('1.2.3.4')!)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '1.2.3.4' &&
|
||||
addr.port === 443
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '1.2.3.4', port: 443 })
|
||||
);
|
||||
done();
|
||||
},
|
||||
|
@ -145,20 +133,13 @@ describe('Name Resolver', () => {
|
|||
const target = resolverManager.mapUriDefaultScheme(parseUri('::1')!)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
);
|
||||
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
|
||||
done();
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
|
@ -174,19 +155,14 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 50051
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
|
||||
);
|
||||
done();
|
||||
},
|
||||
|
@ -203,13 +179,13 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(addressList.length > 0);
|
||||
assert(endpointList.length > 0);
|
||||
done();
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
|
@ -227,7 +203,7 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
|
@ -253,7 +229,7 @@ describe('Name Resolver', () => {
|
|||
let count = 0;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
|
@ -290,21 +266,16 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 443
|
||||
),
|
||||
`None of [${addressList.map(addr =>
|
||||
subchannelAddressToString(addr)
|
||||
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
|
||||
`None of [${endpointList.map(addr =>
|
||||
endpointToString(addr)
|
||||
)}] matched '127.0.0.1:443'`
|
||||
);
|
||||
done();
|
||||
|
@ -324,20 +295,13 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
);
|
||||
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
|
||||
done();
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
|
@ -356,21 +320,16 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 443
|
||||
),
|
||||
`None of [${addressList.map(addr =>
|
||||
subchannelAddressToString(addr)
|
||||
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
|
||||
`None of [${endpointList.map(addr =>
|
||||
endpointToString(addr)
|
||||
)}] matched '127.0.0.1:443'`
|
||||
);
|
||||
/* TODO(murgatroid99): check for IPv6 result, once we can get that
|
||||
|
@ -392,13 +351,13 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(addressList.length > 0);
|
||||
assert(endpointList.length > 0);
|
||||
done();
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
|
@ -422,11 +381,11 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
assert(addressList.length > 0);
|
||||
assert(endpointList.length > 0);
|
||||
completeCount += 1;
|
||||
if (completeCount === 2) {
|
||||
// Only handle the first resolution result
|
||||
|
@ -452,25 +411,15 @@ describe('Name Resolver', () => {
|
|||
target,
|
||||
{
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
|
||||
);
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '::1', port: 443 })
|
||||
);
|
||||
resultCount += 1;
|
||||
if (resultCount === 1) {
|
||||
|
@ -498,7 +447,7 @@ describe('Name Resolver', () => {
|
|||
target,
|
||||
{
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
|
@ -527,17 +476,13 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr => !isTcpSubchannelAddress(addr) && addr.path === 'socket'
|
||||
)
|
||||
);
|
||||
assert(hasMatchingAddress(endpointList, { path: 'socket' }));
|
||||
done();
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
|
@ -553,18 +498,13 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
!isTcpSubchannelAddress(addr) && addr.path === '/tmp/socket'
|
||||
)
|
||||
);
|
||||
assert(hasMatchingAddress(endpointList, { path: '/tmp/socket' }));
|
||||
done();
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
|
@ -582,19 +522,14 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
|
||||
);
|
||||
done();
|
||||
},
|
||||
|
@ -611,19 +546,14 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 50051
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
|
||||
);
|
||||
done();
|
||||
},
|
||||
|
@ -640,27 +570,17 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 50051
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
|
||||
);
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '127.0.0.1' &&
|
||||
addr.port === 50052
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50052 })
|
||||
);
|
||||
done();
|
||||
},
|
||||
|
@ -677,20 +597,13 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 443
|
||||
)
|
||||
);
|
||||
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
|
||||
done();
|
||||
},
|
||||
onError: (error: StatusObject) => {
|
||||
|
@ -706,19 +619,14 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 50051
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
|
||||
);
|
||||
done();
|
||||
},
|
||||
|
@ -735,27 +643,17 @@ describe('Name Resolver', () => {
|
|||
)!;
|
||||
const listener: resolverManager.ResolverListener = {
|
||||
onSuccessfulResolution: (
|
||||
addressList: SubchannelAddress[],
|
||||
endpointList: Endpoint[],
|
||||
serviceConfig: ServiceConfig | null,
|
||||
serviceConfigError: StatusObject | null
|
||||
) => {
|
||||
// Only handle the first resolution result
|
||||
listener.onSuccessfulResolution = () => {};
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 50051
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
|
||||
);
|
||||
assert(
|
||||
addressList.some(
|
||||
addr =>
|
||||
isTcpSubchannelAddress(addr) &&
|
||||
addr.host === '::1' &&
|
||||
addr.port === 50052
|
||||
)
|
||||
hasMatchingAddress(endpointList, { host: '::1', port: 50052 })
|
||||
);
|
||||
done();
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue