Merge pull request #2925 from murgatroid99/grpc-js_channel_spec_resolver_update

grpc-js(-xds): Implement specified resolver and LB policy API changes
This commit is contained in:
Michael Lumish 2025-03-21 11:22:55 -07:00 committed by GitHub
commit e6da4ad1d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 1031 additions and 790 deletions

View File

@ -41,6 +41,7 @@ import PickResult = grpc.experimental.PickResult;
import PickResultType = grpc.experimental.PickResultType;
import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper;
import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig;
import StatusOr = grpc.experimental.StatusOr;
import { ChannelOptions } from '@grpc/grpc-js';
grpc_xds.register();
@ -100,12 +101,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
return false;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options);
return this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options, resolutionNote);
}
exitIdle(): void {
this.child.exitIdle();

View File

@ -26,6 +26,8 @@ import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import StatusOr = experimental.StatusOr;
import statusOrFromValue = experimental.statusOrFromValue;
import { XdsConfig } from './xds-dependency-manager';
import { LocalityEndpoint, PriorityChildRaw } from './load-balancer-priority';
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
@ -205,7 +207,7 @@ function getLeafClusters(xdsConfig: XdsConfig, rootCluster: string, depth = 0):
if (!maybeClusterConfig) {
return [];
}
if (!maybeClusterConfig.success) {
if (!maybeClusterConfig.ok) {
return [rootCluster];
}
if (maybeClusterConfig.value.children.type === 'aggregate') {
@ -240,13 +242,14 @@ export class CdsLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof CdsLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
return false;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
@ -254,12 +257,12 @@ export class CdsLoadBalancer implements LoadBalancer {
const maybeClusterConfig = xdsConfig.clusters.get(clusterName);
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + clusterName);
return;
return false;
}
if (!maybeClusterConfig.success) {
if (!maybeClusterConfig.ok) {
this.childBalancer.destroy();
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error), maybeClusterConfig.error.details);
return;
return true;
}
const clusterConfig = maybeClusterConfig.value;
@ -270,8 +273,8 @@ export class CdsLoadBalancer implements LoadBalancer {
} catch (e) {
trace('xDS config parsing failed with error ' + (e as Error).message);
const errorMessage = `xDS config parsing failed with error ${(e as Error).message}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `${errorMessage} Resolution note: ${resolutionNote}`}), errorMessage);
return true;
}
const priorityChildren: {[name: string]: PriorityChildRaw} = {};
for (const cluster of leafClusters) {
@ -296,16 +299,16 @@ export class CdsLoadBalancer implements LoadBalancer {
} catch (e) {
trace('LB policy config parsing failed with error ' + (e as Error).message);
const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `${errorMessage} Resolution note: ${resolutionNote}`}), errorMessage);
return true;
}
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName});
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName}, resolutionNote);
} else {
if (!clusterConfig.children.endpoints) {
trace('Received update with no resolved endpoints for cluster ' + clusterName);
const errorMessage = `Cluster ${clusterName} resolution failed: ${clusterConfig.children.resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
const newPriorityNames: string[] = [];
const newLocalityPriorities = new Map<string, number>();
@ -317,7 +320,7 @@ export class CdsLoadBalancer implements LoadBalancer {
if (AGGREGATE_CLUSTER_BACKWARDS_COMPAT) {
if (typeof options[ROOT_CLUSTER_KEY] === 'string') {
const maybeRootClusterConfig = xdsConfig.clusters.get(options[ROOT_CLUSTER_KEY]);
if (maybeRootClusterConfig?.success) {
if (maybeRootClusterConfig?.ok) {
endpointPickingPolicy = maybeRootClusterConfig.value.cluster.lbPolicyConfig;
}
}
@ -409,9 +412,9 @@ export class CdsLoadBalancer implements LoadBalancer {
typedChildConfig = parseLoadBalancingConfig(childConfig);
} catch (e) {
trace('LB policy config parsing failed with error ' + (e as Error).message);
const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}`;
const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
const childOptions: ChannelOptions = {...options};
if (clusterConfig.cluster.securityUpdate) {
@ -419,16 +422,16 @@ export class CdsLoadBalancer implements LoadBalancer {
const xdsClient = options[XDS_CLIENT_KEY] as XdsClient;
const caCertProvider = xdsClient.getCertificateProvider(securityUpdate.caCertificateProviderInstance);
if (!caCertProvider) {
const errorMessage = `Cluster ${clusterName} configured with CA certificate provider ${securityUpdate.caCertificateProviderInstance} not in bootstrap`;
const errorMessage = `Cluster ${clusterName} configured with CA certificate provider ${securityUpdate.caCertificateProviderInstance} not in bootstrap. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
if (securityUpdate.identityCertificateProviderInstance) {
const identityCertProvider = xdsClient.getCertificateProvider(securityUpdate.identityCertificateProviderInstance);
if (!identityCertProvider) {
const errorMessage = `Cluster ${clusterName} configured with identity certificate provider ${securityUpdate.identityCertificateProviderInstance} not in bootstrap`;
const errorMessage = `Cluster ${clusterName} configured with identity certificate provider ${securityUpdate.identityCertificateProviderInstance} not in bootstrap. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
childOptions[IDENTITY_CERT_PROVIDER_KEY] = identityCertProvider;
}
@ -440,8 +443,9 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Configured subject alternative name matcher: ' + sanMatcher);
childOptions[SAN_MATCHER_KEY] = this.latestSanMatcher;
}
this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, childOptions);
this.childBalancer.updateAddressList(statusOrFromValue(childEndpointList), typedChildConfig, childOptions, resolutionNote);
}
return true;
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -27,6 +27,8 @@ import QueuePicker = experimental.QueuePicker;
import UnavailablePicker = experimental.UnavailablePicker;
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import StatusOr = experimental.StatusOr;
import statusOrFromValue = experimental.statusOrFromValue;
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
const TRACER_NAME = 'priority';
@ -155,9 +157,10 @@ class PriorityLoadBalancingConfig implements TypedLoadBalancingConfig {
interface PriorityChildBalancer {
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
attributes: { [key: string]: unknown },
resolutionNote: string
): void;
exitIdle(): void;
resetBackoff(): void;
@ -240,11 +243,12 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
attributes: { [key: string]: unknown },
resolutionNote: string
): void {
this.childBalancer.updateAddressList(endpointList, lbConfig, attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig, attributes, resolutionNote);
}
exitIdle() {
@ -332,6 +336,8 @@ export class PriorityLoadBalancer implements LoadBalancer {
private updatesPaused = false;
private latestResolutionNote: string = '';
constructor(private channelControlHelper: ChannelControlHelper) {}
private updateState(state: ConnectivityState, picker: Picker, errorMessage: string | null) {
@ -401,9 +407,10 @@ export class PriorityLoadBalancer implements LoadBalancer {
child = new this.PriorityChildImpl(this, childName, childUpdate.ignoreReresolutionRequests);
this.children.set(childName, child);
child.updateAddressList(
childUpdate.subchannelAddress,
statusOrFromValue(childUpdate.subchannelAddress),
childUpdate.lbConfig,
this.latestOptions
this.latestOptions,
this.latestResolutionNote
);
} else {
/* We're going to try to use this child, so reactivate it if it has been
@ -440,14 +447,21 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof PriorityLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
if (!endpointList.ok) {
if (this.latestUpdates.size === 0) {
this.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker(endpointList.error), endpointList.error.details);
}
return true;
}
/* For each address, the first element of its localityPath array determines
* which child it belongs to. So we bucket those addresses by that first
@ -457,14 +471,14 @@ export class PriorityLoadBalancer implements LoadBalancer {
string,
LocalityEndpoint[]
>();
for (const endpoint of endpointList) {
for (const endpoint of endpointList.value) {
if (!isLocalityEndpoint(endpoint)) {
// Reject address that cannot be prioritized
return;
return false;
}
if (endpoint.localityPath.length < 1) {
// Reject address that cannot be prioritized
return;
return false;
}
const childName = endpoint.localityPath[0];
const childAddress: LocalityEndpoint = {
@ -495,9 +509,10 @@ export class PriorityLoadBalancer implements LoadBalancer {
const existingChild = this.children.get(childName);
if (existingChild !== undefined) {
existingChild.updateAddressList(
childAddresses,
statusOrFromValue(childAddresses),
childConfig.config,
options
options,
resolutionNote
);
}
}
@ -509,7 +524,9 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
}
this.updatesPaused = false;
this.latestResolutionNote = resolutionNote;
this.choosePriority();
return true;
}
exitIdle(): void {
if (this.currentPriority !== null) {

View File

@ -31,6 +31,7 @@ import UnavailablePicker = experimental.UnavailablePicker;
import subchannelAddressToString = experimental.subchannelAddressToString;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import EndpointMap = experimental.EndpointMap;
import StatusOr = experimental.StatusOr;
import { loadXxhashApi, xxhashApi } from './xxhash';
import { EXPERIMENTAL_RING_HASH } from './environment';
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
@ -401,26 +402,44 @@ class RingHashLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof RingHashLoadBalancingConfig)) {
trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
if (!endpointList.ok) {
if (this.ring.length === 0) {
this.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(endpointList.error), endpointList.error.details);
}
return true;
}
if (endpointList.value.length === 0) {
for (const ringEntry of this.ring) {
ringEntry.leafBalancer.destroy();
}
this.ring = [];
this.leafMap.clear();
this.leafWeightMap.clear();
const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`;
this.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return false;
}
trace('Received update with config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
this.updatesPaused = true;
this.leafWeightMap.clear();
const dedupedEndpointList: Endpoint[] = [];
for (const endpoint of endpointList) {
for (const endpoint of endpointList.value) {
const leafBalancer = this.leafMap.get(endpoint);
if (leafBalancer) {
leafBalancer.updateEndpoint(endpoint, options);
} else {
this.leafMap.set(
endpoint,
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options)
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options, resolutionNote)
);
}
const weight = this.leafWeightMap.get(endpoint);
@ -429,7 +448,7 @@ class RingHashLoadBalancer implements LoadBalancer {
}
this.leafWeightMap.set(endpoint, (weight ?? 0) + (isLocalityEndpoint(endpoint) ? endpoint.endpointWeight : 1));
}
const removedLeaves = this.leafMap.deleteMissing(endpointList);
const removedLeaves = this.leafMap.deleteMissing(endpointList.value);
for (const leaf of removedLeaves) {
leaf.destroy();
}
@ -440,6 +459,7 @@ class RingHashLoadBalancer implements LoadBalancer {
this.calculateAndUpdateState();
this.maybeProactivelyConnect();
});
return true;
}
exitIdle(): void {
/* This operation does not make sense here. We don't want to make the whole

View File

@ -15,7 +15,7 @@
*
*/
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig, ChannelOptions } from "@grpc/grpc-js";
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig, ChannelOptions, connectivityState, status } from "@grpc/grpc-js";
import { isLocalityEndpoint, LocalityEndpoint } from "./load-balancer-priority";
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import LoadBalancer = experimental.LoadBalancer;
@ -30,6 +30,8 @@ import UnavailablePicker = experimental.UnavailablePicker;
import Endpoint = experimental.Endpoint;
import endpointToString = experimental.endpointToString;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import StatusOr = experimental.StatusOr;
import statusOrFromValue = experimental.statusOrFromValue;
const TRACER_NAME = 'weighted_target';
@ -154,7 +156,7 @@ class WeightedTargetPicker implements Picker {
}
interface WeightedChild {
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void;
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: WeightedTarget, options: ChannelOptions, resolutionNote: string): void;
exitIdle(): void;
resetBackoff(): void;
destroy(): void;
@ -193,9 +195,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.parent.maybeUpdateState();
}
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: WeightedTarget, options: ChannelOptions, resolutionNote: string): void {
this.weight = lbConfig.weight;
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options);
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options, resolutionNote);
}
exitIdle(): void {
this.childBalancer.exitIdle();
@ -325,26 +327,41 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, picker, errorMessage);
}
updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(addressList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
if (!addressList.ok) {
if (this.targets.size === 0) {
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(addressList.error), addressList.error.details);
}
return true;
}
if (addressList.value.length === 0) {
for (const target of this.targets.values()) {
target.destroy();
}
this.targets.clear();
this.targetList = [];
const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return false;
}
/* For each address, the first element of its localityPath array determines
* which child it belongs to. So we bucket those addresses by that first
* element, and pass along the rest of the localityPath for that child
* to use. */
const childEndpointMap = new Map<string, LocalityEndpoint[]>();
for (const address of addressList) {
for (const address of addressList.value) {
if (!isLocalityEndpoint(address)) {
// Reject address that cannot be associated with targets
return;
return false;
}
if (address.localityPath.length < 1) {
// Reject address that cannot be associated with targets
return;
return false;
}
const childName = address.localityPath[0];
const childAddress: LocalityEndpoint = {
@ -371,7 +388,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
}
const targetEndpoints = childEndpointMap.get(targetName) ?? [];
trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'));
target.updateAddressList(targetEndpoints, targetConfig, options);
target.updateAddressList(statusOrFromValue(targetEndpoints), targetConfig, options, resolutionNote);
}
// Deactivate targets that are not in the new config
@ -384,6 +401,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.updatesPaused = false;
this.updateState();
return true;
}
exitIdle(): void {
for (const targetName of this.targetList) {

View File

@ -37,6 +37,7 @@ import selectLbConfigFromList = experimental.selectLbConfigFromList;
import SubchannelInterface = experimental.SubchannelInterface;
import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper;
import UnavailablePicker = experimental.UnavailablePicker;
import StatusOr = experimental.StatusOr;
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
import { ClusterConfig, XdsConfig } from "./xds-dependency-manager";
import { CdsUpdate } from "./xds-resource-type/cluster-resource-type";
@ -206,7 +207,7 @@ function getCallCounterMapKey(cluster: string, edsServiceName?: string): string
class XdsClusterImplBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private lastestEndpointList: Endpoint[] | null = null;
private lastestEndpointList: StatusOr<Endpoint[]> | null = null;
private latestConfig: XdsClusterImplLoadBalancingConfig | null = null;
private clusterDropStats: XdsClusterDropStats | null = null;
private xdsClient: XdsClient | null = null;
@ -215,12 +216,12 @@ class XdsClusterImplBalancer implements LoadBalancer {
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress, subchannelArgs) => {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) {
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated');
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.lastestEndpointList.ok || !this.latestClusterConfig) {
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated or with resolver error');
}
const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
let locality: Locality__Output | null = null;
for (const endpoint of this.lastestEndpointList) {
for (const endpoint of this.lastestEndpointList.value) {
if (endpointHasAddress(endpoint, subchannelAddress)) {
locality = (endpoint as LocalityEndpoint).locality;
}
@ -251,28 +252,28 @@ class XdsClusterImplBalancer implements LoadBalancer {
}
}));
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster());
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + lbConfig.getCluster());
return;
return false;
}
if (!maybeClusterConfig.success) {
if (!maybeClusterConfig.ok) {
this.latestClusterConfig = null;
this.childBalancer.destroy();
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error), maybeClusterConfig.error.details);
return;
return false;
}
const clusterConfig = maybeClusterConfig.value;
if (clusterConfig.children.type === 'aggregate') {
trace('Received update for aggregate cluster ' + lbConfig.getCluster());
return;
return false;
}
if (!clusterConfig.children.endpoints) {
this.childBalancer.destroy();
@ -291,7 +292,8 @@ class XdsClusterImplBalancer implements LoadBalancer {
);
}
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options);
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options, resolutionNote);
return true;
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -30,6 +30,7 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import ChannelControlHelper = experimental.ChannelControlHelper;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import StatusOr = experimental.StatusOr;
const TRACER_NAME = 'xds_cluster_manager';
@ -111,7 +112,7 @@ class XdsClusterManagerPicker implements Picker {
}
interface XdsClusterManagerChild {
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void;
updateAddressList(endpointList: StatusOr<Endpoint[]>, childConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): void;
exitIdle(): void;
resetBackoff(): void;
destroy(): void;
@ -145,8 +146,8 @@ class XdsClusterManager implements LoadBalancer {
}
this.parent.maybeUpdateState();
}
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
this.childBalancer.updateAddressList(endpointList, childConfig, options);
updateAddressList(endpointList: StatusOr<Endpoint[]>, childConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): void {
this.childBalancer.updateAddressList(endpointList, childConfig, options, resolutionNote);
}
exitIdle(): void {
this.childBalancer.exitIdle();
@ -213,11 +214,11 @@ class XdsClusterManager implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap), errorMessage);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const configChildren = lbConfig.getChildren();
@ -240,10 +241,11 @@ class XdsClusterManager implements LoadBalancer {
child = new this.XdsClusterManagerChildImpl(this, name);
this.children.set(name, child);
}
child.updateAddressList(endpointList, childConfig, options);
child.updateAddressList(endpointList, childConfig, options, resolutionNote);
}
this.updatesPaused = false;
this.updateState();
return true;
}
exitIdle(): void {
for (const child of this.children.values()) {

View File

@ -29,6 +29,7 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import Endpoint = experimental.Endpoint;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import StatusOr = experimental.StatusOr;
import { Any__Output } from "./generated/google/protobuf/Any";
import { WrrLocality__Output } from "./generated/envoy/extensions/load_balancing_policies/wrr_locality/v3/WrrLocality";
import { TypedExtensionConfig__Output } from "./generated/envoy/config/core/v3/TypedExtensionConfig";
@ -76,15 +77,19 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof XdsWrrLocalityLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
return false;
}
if (!endpointList.ok) {
this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig({weighted_target: { targets: [] }}), options, resolutionNote);
return true;
}
const targets: {[localityName: string]: WeightedTargetRaw} = {};
for (const address of endpointList) {
for (const address of endpointList.value) {
if (!isLocalityEndpoint(address)) {
return;
return false;
}
const localityName = localityToName(address.locality);
if (!(localityName in targets)) {
@ -99,7 +104,8 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
targets: targets
}
};
this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), options);
this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), options, resolutionNote);
return true;
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -41,6 +41,9 @@ import { loadXxhashApi } from './xxhash';
import { formatTemplateString } from './xds-bootstrap';
import { getPredicateForMatcher } from './route';
import { XdsConfig, XdsConfigWatcher, XdsDependencyManager } from './xds-dependency-manager';
import statusOrFromValue = experimental.statusOrFromValue;
import statusOrFromError = experimental.statusOrFromError;
import CHANNEL_ARGS_CONFIG_SELECTOR_KEY = experimental.CHANNEL_ARGS_CONFIG_SELECTOR_KEY;
const TRACER_NAME = 'xds_resolver';
@ -143,7 +146,7 @@ class XdsResolver implements Resolver {
trace('Resolution error for target ' + uriToString(this.target) + ': ' + context + ' does not exist');
/* Return an empty endpoint list and service config, to explicitly
* invalidate any previously returned service config */
this.listener.onSuccessfulResolution([], null, null, null, {});
this.listener(statusOrFromValue([]), {}, null, '');
}
}
}
@ -407,20 +410,20 @@ class XdsResolver implements Resolver {
methodConfig: [],
loadBalancingConfig: [lbPolicyConfig]
}
this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {
this.listener(statusOrFromValue([]), {
[XDS_CLIENT_KEY]: this.xdsClient,
[XDS_CONFIG_KEY]: xdsConfig
});
[XDS_CONFIG_KEY]: xdsConfig,
[CHANNEL_ARGS_CONFIG_SELECTOR_KEY]: configSelector
}, statusOrFromValue(serviceConfig), '');
}
private reportResolutionError(reason: string) {
this.listener.onError({
this.listener(statusOrFromError({
code: status.UNAVAILABLE,
details: `xDS name resolution failed for target ${uriToString(
this.target
)}: ${reason}`,
metadata: new Metadata(),
});
)}: ${reason}`
}), {}, null, '');
}
private startResolution(): void {

View File

@ -25,6 +25,7 @@ import { DropCategory } from "./load-balancer-xds-cluster-impl";
import Endpoint = experimental.Endpoint;
import Resolver = experimental.Resolver;
import createResolver = experimental.createResolver;
import StatusOr = experimental.StatusOr;
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from "./resources";
import { RouteConfigurationResourceType } from "./xds-resource-type/route-config-resource-type";
import { ListenerResourceType } from "./xds-resource-type/listener-resource-type";
@ -75,14 +76,6 @@ export interface ClusterConfig {
children: EndpointConfig | AggregateConfig;
}
export type StatusOr<T> = {
success: true;
value: T
} | {
success: false;
error: StatusObject;
}
export interface ClusterResult {
clusterConfig?: ClusterConfig;
status?: StatusObject;
@ -159,7 +152,7 @@ function isClusterTreeFullyUpdated(tree: ClusterGraph, roots: string[]): Cluster
reason: 'Cluster entry ' + next + ' not updated'
};
}
if (tree[next].latestUpdate.success) {
if (tree[next].latestUpdate.ok) {
if (tree[next].latestUpdate.value.type !== 'AGGREGATE') {
if (!(tree[next].latestUpdate.value.latestUpdate)) {
return {
@ -470,7 +463,7 @@ export class XdsDependencyManager {
this.trace('Not sending update: Cluster entry ' + clusterName + ' not updated (not caught by isClusterTreeFullyUpdated)');
return;
}
if (entry.latestUpdate.success) {
if (entry.latestUpdate.ok) {
let clusterChildren: EndpointConfig | AggregateConfig;
if (entry.latestUpdate.value.type === 'AGGREGATE') {
clusterChildren = {
@ -485,7 +478,7 @@ export class XdsDependencyManager {
};
}
update.clusters.set(clusterName, {
success: true,
ok: true,
value: {
cluster: entry.latestUpdate.value.cdsUpdate,
children: clusterChildren
@ -493,7 +486,7 @@ export class XdsDependencyManager {
});
} else {
update.clusters.set(clusterName, {
success: false,
ok: false,
error: entry.latestUpdate.error
});
}
@ -510,7 +503,7 @@ export class XdsDependencyManager {
onResourceChanged: (update: CdsUpdate) => {
switch (update.type) {
case 'AGGREGATE':
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'AGGREGATE':
break;
@ -525,7 +518,7 @@ export class XdsDependencyManager {
}
entry.children = update.aggregateChildren;
entry.latestUpdate = {
success: true,
ok: true,
value: {
type: 'AGGREGATE',
cdsUpdate: update
@ -539,7 +532,7 @@ export class XdsDependencyManager {
break;
case 'EDS':
const edsServiceName = update.edsServiceName ?? clusterName;
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'AGGREGATE':
entry.children = [];
@ -566,14 +559,14 @@ export class XdsDependencyManager {
}
const edsWatcher = new Watcher<ClusterLoadAssignment__Output>({
onResourceChanged: (endpoint: ClusterLoadAssignment__Output) => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'EDS') {
entry.latestUpdate.value.latestUpdate = getEdsResource(endpoint);
entry.latestUpdate.value.resolutionNote = undefined;
this.maybeSendUpdate();
}
},
onError: error => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'EDS') {
if (!entry.latestUpdate.value.latestUpdate) {
entry.latestUpdate.value.resolutionNote = `Control plane error: ${error.details}`;
this.maybeSendUpdate();
@ -581,7 +574,7 @@ export class XdsDependencyManager {
}
},
onResourceDoesNotExist: () => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'EDS') {
entry.latestUpdate.value.resolutionNote = 'Resource does not exist';
entry.latestUpdate.value.latestUpdate = undefined;
this.maybeSendUpdate();
@ -589,7 +582,7 @@ export class XdsDependencyManager {
}
});
entry.latestUpdate = {
success: true,
ok: true,
value: {
type: 'EDS',
cdsUpdate: update,
@ -602,7 +595,7 @@ export class XdsDependencyManager {
this.maybeSendUpdate();
break;
case 'LOGICAL_DNS': {
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'AGGREGATE':
entry.children = [];
@ -621,24 +614,24 @@ export class XdsDependencyManager {
}
}
this.trace('Creating DNS resolver for hostname ' + update.dnsHostname!);
const resolver = createResolver({scheme: 'dns', path: update.dnsHostname!}, {
onSuccessfulResolution: endpointList => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'LOGICAL_DNS') {
entry.latestUpdate.value.latestUpdate = getDnsResource(endpointList);
const resolver = createResolver({scheme: 'dns', path: update.dnsHostname!}, endpointList => {
if (endpointList.ok) {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'LOGICAL_DNS') {
entry.latestUpdate.value.latestUpdate = getDnsResource(endpointList.value);
this.maybeSendUpdate();
}
},
onError: error => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'LOGICAL_DNS') {
} else {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'LOGICAL_DNS') {
if (!entry.latestUpdate.value.latestUpdate) {
entry.latestUpdate.value.resolutionNote = `DNS resolution error: ${error.details}`;
entry.latestUpdate.value.resolutionNote = `DNS resolution error: ${endpointList.error.details}`;
this.maybeSendUpdate();
}
}
}
return true;
}, {'grpc.service_config_disable_resolution': 1});
entry.latestUpdate = {
success: true,
ok: true,
value: {
type: 'LOGICAL_DNS',
cdsUpdate: update,
@ -653,16 +646,16 @@ export class XdsDependencyManager {
}
},
onError: error => {
if (!entry.latestUpdate?.success) {
if (!entry.latestUpdate?.ok) {
entry.latestUpdate = {
success: false,
ok: false,
error: error
};
this.maybeSendUpdate();
}
},
onResourceDoesNotExist: () => {
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'EDS':
this.trace('EDS.cancelWatch(' + entry.latestUpdate.value.edsServiceName + '): CDS resource does not exist');
@ -676,7 +669,7 @@ export class XdsDependencyManager {
}
}
entry.latestUpdate = {
success: false,
ok: false,
error: {
code: status.UNAVAILABLE,
details: `Cluster resource ${clusterName} does not exist`,
@ -718,7 +711,7 @@ export class XdsDependencyManager {
return;
}
const entry = this.clusterForest[clusterName];
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'EDS':
this.trace('EDS.cancelWatch(' + entry.latestUpdate.value.edsServiceName + '): Cluster ' + clusterName + ' removed');
@ -806,7 +799,7 @@ export class XdsDependencyManager {
updateResolution() {
for (const clusterEntry of Object.values(this.clusterForest)) {
if (clusterEntry.latestUpdate?.success && clusterEntry.latestUpdate.value.type === 'LOGICAL_DNS') {
if (clusterEntry.latestUpdate?.ok && clusterEntry.latestUpdate.value.type === 'LOGICAL_DNS') {
clusterEntry.latestUpdate.value.resolver.updateResolution();
}
}

View File

@ -38,6 +38,7 @@ import PickResultType = experimental.PickResultType;
import createChildChannelControlHelper = experimental.createChildChannelControlHelper;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import StatusOr = experimental.StatusOr;
import { PickFirst } from "../src/generated/envoy/extensions/load_balancing_policies/pick_first/v3/PickFirst";
const LB_POLICY_NAME = 'test.RpcBehaviorLoadBalancer';
@ -95,12 +96,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
return false;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options);
return this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options, resolutionNote);
}
exitIdle(): void {
this.child.exitIdle();

View File

@ -41,6 +41,35 @@ export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
metadata?: Metadata | null | undefined;
};
export interface StatusOrOk<T> {
ok: true;
value: T;
}
export interface StatusOrError {
ok: false;
error: StatusObject;
}
export type StatusOr<T> = StatusOrOk<T> | StatusOrError;
export function statusOrFromValue<T>(value: T): StatusOr<T> {
return {
ok: true,
value: value
};
}
export function statusOrFromError<T>(error: PartialStatusObject): StatusOr<T> {
return {
ok: false,
error: {
...error,
metadata: error.metadata ?? new Metadata()
}
};
}
export const enum WriteFlags {
BufferHint = 1,
NoCompress = 2,

View File

@ -5,6 +5,7 @@ export {
registerResolver,
ConfigSelector,
createResolver,
CHANNEL_ARGS_CONFIG_SELECTOR_KEY,
} from './resolver';
export { GrpcUri, uriToString, splitHostPort, HostPort } from './uri-parser';
export { Duration, durationToMs, parseDuration } from './duration';
@ -37,7 +38,12 @@ export {
PickArgs,
PickResultType,
} from './picker';
export { Call as CallStream } from './call-interface';
export {
Call as CallStream,
StatusOr,
statusOrFromValue,
statusOrFromError
} from './call-interface';
export { Filter, BaseFilter, FilterFactory } from './filter';
export { FilterStackFactory } from './filter-stack';
export { registerAdminService } from './admin';

View File

@ -27,6 +27,7 @@ import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker';
import type { ChannelRef, SubchannelRef } from './channelz';
import { SubchannelInterface } from './subchannel-interface';
import { StatusOr } from './call-interface';
const TYPE_NAME = 'child_load_balancer_helper';
@ -102,10 +103,11 @@ export class ChildLoadBalancerHandler {
* @param attributes
*/
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
let childToUpdate: LoadBalancer;
if (
this.currentChild === null ||
@ -133,7 +135,7 @@ export class ChildLoadBalancerHandler {
}
}
this.latestConfig = lbConfig;
childToUpdate.updateAddressList(endpointList, lbConfig, options);
return childToUpdate.updateAddressList(endpointList, lbConfig, options, resolutionNote);
}
exitIdle(): void {
if (this.currentChild) {

View File

@ -43,6 +43,7 @@ import {
} from './subchannel-interface';
import * as logging from './logging';
import { LoadBalancingConfig } from './service-config';
import { StatusOr } from './call-interface';
const TRACER_NAME = 'outlier_detection';
@ -757,28 +758,31 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) {
return;
return false;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2))
for (const endpoint of endpointList) {
if (!this.entryMap.has(endpoint)) {
trace('Adding map entry for ' + endpointToString(endpoint));
this.entryMap.set(endpoint, {
counter: new CallCounter(),
currentEjectionTimestamp: null,
ejectionTimeMultiplier: 0,
subchannelWrappers: [],
});
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
if (endpointList.ok) {
for (const endpoint of endpointList.value) {
if (!this.entryMap.has(endpoint)) {
trace('Adding map entry for ' + endpointToString(endpoint));
this.entryMap.set(endpoint, {
counter: new CallCounter(),
currentEjectionTimestamp: null,
ejectionTimeMultiplier: 0,
subchannelWrappers: [],
});
}
}
this.entryMap.deleteMissing(endpointList.value);
}
this.entryMap.deleteMissing(endpointList);
const childPolicy = lbConfig.getChildPolicy();
this.childBalancer.updateAddressList(endpointList, childPolicy, options);
this.childBalancer.updateAddressList(endpointList, childPolicy, options, resolutionNote);
if (
lbConfig.getSuccessRateEjectionConfig() ||
@ -808,6 +812,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
}
this.latestConfig = lbConfig;
return true;
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -43,6 +43,7 @@ import {
import { isTcpSubchannelAddress } from './subchannel-address';
import { isIPv6 } from 'net';
import { ChannelOptions } from './channel-options';
import { StatusOr, statusOrFromValue } from './call-interface';
const TRACER_NAME = 'pick_first';
@ -236,6 +237,8 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private latestOptions: ChannelOptions = {};
private latestResolutionNote: string = '';
/**
* Load balancer that attempts to connect to each backend in the address list
* in order, and picks the first one that connects, using it for every
@ -277,7 +280,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
);
}
} else if (this.latestAddressList?.length === 0) {
const errorMessage = `No connection established. Last error: ${this.lastError}`;
const errorMessage = `No connection established. Last error: ${this.lastError}. Resolution note: ${this.latestResolutionNote}`;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
@ -289,7 +292,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this), null);
} else {
if (this.stickyTransientFailureMode) {
const errorMessage = `No connection established. Last error: ${this.lastError}`;
const errorMessage = `No connection established. Last error: ${this.lastError}. Resolution note: ${this.latestResolutionNote}`;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
@ -505,13 +508,25 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
maybeEndpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {
return;
return false;
}
if (!maybeEndpointList.ok) {
if (this.children.length === 0 && this.currentPick === null) {
this.channelControlHelper.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker(maybeEndpointList.error),
maybeEndpointList.error.details
);
}
return true;
}
let endpointList = maybeEndpointList.value;
this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME];
/* Previously, an update would be discarded if it was identical to the
* previous update, to minimize churn. Now the DNS resolver is
@ -523,13 +538,17 @@ export class PickFirstLoadBalancer implements LoadBalancer {
...endpointList.map(endpoint => endpoint.addresses)
);
trace('updateAddressList([' + rawAddressList.map(address => subchannelAddressToString(address)) + '])');
if (rawAddressList.length === 0) {
this.lastError = 'No addresses resolved';
}
const addressList = interleaveAddressFamilies(rawAddressList);
this.latestAddressList = addressList;
this.latestOptions = options;
this.connectToAddressList(addressList, options);
this.latestResolutionNote = resolutionNote;
if (rawAddressList.length > 0) {
return true;
} else {
this.lastError = 'No addresses resolved';
return false;
}
}
exitIdle() {
@ -570,7 +589,8 @@ export class LeafLoadBalancer {
constructor(
private endpoint: Endpoint,
channelControlHelper: ChannelControlHelper,
private options: ChannelOptions
private options: ChannelOptions,
private resolutionNote: string
) {
const childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
@ -590,9 +610,10 @@ export class LeafLoadBalancer {
startConnecting() {
this.pickFirstBalancer.updateAddressList(
[this.endpoint],
statusOrFromValue([this.endpoint]),
LEAF_CONFIG,
{ ...this.options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true }
{ ...this.options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true },
this.resolutionNote
);
}

View File

@ -39,6 +39,7 @@ import {
} from './subchannel-address';
import { LeafLoadBalancer } from './load-balancer-pick-first';
import { ChannelOptions } from './channel-options';
import { StatusOr } from './call-interface';
const TRACER_NAME = 'round_robin';
@ -205,14 +206,38 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
for (const child of this.children) {
child.destroy();
}
this.children = [];
}
updateAddressList(
endpointList: Endpoint[],
maybeEndpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof RoundRobinLoadBalancingConfig)) {
return false;
}
if (!maybeEndpointList.ok) {
if (this.children.length === 0) {
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker(maybeEndpointList.error),
maybeEndpointList.error.details
);
}
return true;
}
const endpointList = maybeEndpointList.value;
this.resetSubchannelList();
if (endpointList.length === 0) {
const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({details: errorMessage}),
errorMessage
);
}
trace('Connect to endpoint list ' + endpointList.map(endpointToString));
this.updatesPaused = true;
this.children = endpointList.map(
@ -220,7 +245,8 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
new LeafLoadBalancer(
endpoint,
this.childChannelControlHelper,
options
options,
resolutionNote
)
);
for (const child of this.children) {
@ -228,6 +254,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
}
this.updatesPaused = false;
this.calculateAndUpdateState();
return true;
}
exitIdle(): void {

View File

@ -24,6 +24,7 @@ import { SubchannelInterface } from './subchannel-interface';
import { LoadBalancingConfig } from './service-config';
import { log } from './logging';
import { LogVerbosity } from './constants';
import { StatusOr } from './call-interface';
/**
* A collection of functions associated with a channel that a load balancer
@ -102,12 +103,16 @@ export interface LoadBalancer {
* @param endpointList The new list of addresses to connect to
* @param lbConfig The load balancing config object from the service config,
* if one was provided
* @param channelOptions Channel options from the channel, plus resolver
* attributes
* @param resolutionNote A not from the resolver to include in errors
*/
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
channelOptions: ChannelOptions
): void;
channelOptions: ChannelOptions,
resolutionNote: string
): boolean;
/**
* If the load balancer is currently in the IDLE state, start connecting.
*/

View File

@ -23,7 +23,7 @@ import {
import { promises as dns } from 'dns';
import { extractAndSelectServiceConfig, ServiceConfig } from './service-config';
import { Status } from './constants';
import { StatusObject } from './call-interface';
import { StatusObject, StatusOr, statusOrFromError, statusOrFromValue } from './call-interface';
import { Metadata } from './metadata';
import * as logging from './logging';
import { LogVerbosity } from './constants';
@ -62,9 +62,8 @@ class DnsResolver implements Resolver {
private readonly minTimeBetweenResolutionsMs: number;
private pendingLookupPromise: Promise<TcpSubchannelAddress[]> | null = null;
private pendingTxtPromise: Promise<string[][]> | null = null;
private latestLookupResult: Endpoint[] | null = null;
private latestServiceConfig: ServiceConfig | null = null;
private latestServiceConfigError: StatusObject | null = null;
private latestLookupResult: StatusOr<Endpoint[]> | null = null;
private latestServiceConfigResult: StatusOr<ServiceConfig> | null = null;
private percentage: number;
private defaultResolutionError: StatusObject;
private backoff: BackoffTimeout;
@ -149,13 +148,12 @@ class DnsResolver implements Resolver {
if (!this.returnedIpResult) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.listener.onSuccessfulResolution(
this.ipResult!,
this.listener(
statusOrFromValue(this.ipResult!),
{},
null,
null,
null,
{}
);
''
)
});
this.returnedIpResult = true;
}
@ -167,11 +165,15 @@ class DnsResolver implements Resolver {
if (this.dnsHostname === null) {
trace('Failed to parse DNS address ' + uriToString(this.target));
setImmediate(() => {
this.listener.onError({
code: Status.UNAVAILABLE,
details: `Failed to parse DNS address ${uriToString(this.target)}`,
metadata: new Metadata(),
});
this.listener(
statusOrFromError({
code: Status.UNAVAILABLE,
details: `Failed to parse DNS address ${uriToString(this.target)}`
}),
{},
null,
''
);
});
this.stopNextResolutionTimer();
} else {
@ -194,11 +196,9 @@ class DnsResolver implements Resolver {
return;
}
this.pendingLookupPromise = null;
this.backoff.reset();
this.backoff.stop();
this.latestLookupResult = addressList.map(address => ({
this.latestLookupResult = statusOrFromValue(addressList.map(address => ({
addresses: [address],
}));
})));
const allAddressesString: string =
'[' +
addressList.map(addr => addr.host + ':' + addr.port).join(',') +
@ -209,21 +209,17 @@ class DnsResolver implements Resolver {
': ' +
allAddressesString
);
if (this.latestLookupResult.length === 0) {
this.listener.onError(this.defaultResolutionError);
return;
}
/* If the TXT lookup has not yet finished, both of the last two
* arguments will be null, which is the equivalent of getting an
* empty TXT response. When the TXT lookup does finish, its handler
* can update the service config by using the same address list */
this.listener.onSuccessfulResolution(
const healthStatus = this.listener(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
{},
this.latestServiceConfigResult,
''
);
this.handleHealthStatus(healthStatus);
},
err => {
if (this.pendingLookupPromise === null) {
@ -237,7 +233,12 @@ class DnsResolver implements Resolver {
);
this.pendingLookupPromise = null;
this.stopNextResolutionTimer();
this.listener.onError(this.defaultResolutionError);
this.listener(
statusOrFromError(this.defaultResolutionError),
{},
this.latestServiceConfigResult,
''
)
}
);
/* If there already is a still-pending TXT resolution, we can just use
@ -253,31 +254,35 @@ class DnsResolver implements Resolver {
return;
}
this.pendingTxtPromise = null;
let serviceConfig: ServiceConfig | null;
try {
this.latestServiceConfig = extractAndSelectServiceConfig(
serviceConfig = extractAndSelectServiceConfig(
txtRecord,
this.percentage
);
if (serviceConfig) {
this.latestServiceConfigResult = statusOrFromValue(serviceConfig);
} else {
this.latestServiceConfigResult = null;
}
} catch (err) {
this.latestServiceConfigError = {
this.latestServiceConfigResult = statusOrFromError({
code: Status.UNAVAILABLE,
details: `Parsing service config failed with error ${
(err as Error).message
}`,
metadata: new Metadata(),
};
}`
});
}
if (this.latestLookupResult !== null) {
/* We rely here on the assumption that calling this function with
* identical parameters will be essentialy idempotent, and calling
* it with the same address list and a different service config
* should result in a fast and seamless switchover. */
this.listener.onSuccessfulResolution(
this.listener(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
{},
this.latestServiceConfigResult,
''
);
}
},
@ -295,6 +300,21 @@ class DnsResolver implements Resolver {
}
}
/**
* The ResolverListener returns a boolean indicating whether the LB policy
* accepted the resolution result. A false result on an otherwise successful
* resolution should be treated as a resolution failure.
* @param healthStatus
*/
private handleHealthStatus(healthStatus: boolean) {
if (healthStatus) {
this.backoff.stop();
this.backoff.reset();
} else {
this.continueResolving = true;
}
}
private async lookup(hostname: string): Promise<TcpSubchannelAddress[]> {
if (GRPC_NODE_USE_ALTERNATIVE_RESOLVER) {
trace('Using alternative DNS resolver.');
@ -400,8 +420,7 @@ class DnsResolver implements Resolver {
this.pendingLookupPromise = null;
this.pendingTxtPromise = null;
this.latestLookupResult = null;
this.latestServiceConfig = null;
this.latestServiceConfigError = null;
this.latestServiceConfigResult = null;
this.returnedIpResult = false;
}

View File

@ -15,7 +15,7 @@
*/
import { isIPv4, isIPv6 } from 'net';
import { StatusObject } from './call-interface';
import { StatusObject, statusOrFromError, statusOrFromValue } from './call-interface';
import { ChannelOptions } from './channel-options';
import { LogVerbosity, Status } from './constants';
import { Metadata } from './metadata';
@ -92,14 +92,18 @@ class IpResolver implements Resolver {
this.hasReturnedResult = true;
process.nextTick(() => {
if (this.error) {
this.listener.onError(this.error);
this.listener(
statusOrFromError(this.error),
{},
null,
''
);
} else {
this.listener.onSuccessfulResolution(
this.endpoints,
this.listener(
statusOrFromValue(this.endpoints),
{},
null,
null,
null,
{}
''
);
}
});

View File

@ -18,6 +18,7 @@ import { Resolver, ResolverListener, registerResolver } from './resolver';
import { Endpoint } from './subchannel-address';
import { GrpcUri } from './uri-parser';
import { ChannelOptions } from './channel-options';
import { statusOrFromValue } from './call-interface';
class UdsResolver implements Resolver {
private hasReturnedResult = false;
@ -39,12 +40,11 @@ class UdsResolver implements Resolver {
if (!this.hasReturnedResult) {
this.hasReturnedResult = true;
process.nextTick(
this.listener.onSuccessfulResolution,
this.endpoints,
this.listener,
statusOrFromValue(this.endpoints),
{},
null,
null,
null,
{}
''
);
}
}

View File

@ -16,7 +16,7 @@
*/
import { MethodConfig, ServiceConfig } from './service-config';
import { StatusObject } from './call-interface';
import { StatusOr } from './call-interface';
import { Endpoint } from './subchannel-address';
import { GrpcUri, uriToString } from './uri-parser';
import { ChannelOptions } from './channel-options';
@ -24,6 +24,8 @@ import { Metadata } from './metadata';
import { Status } from './constants';
import { Filter, FilterFactory } from './filter';
export const CHANNEL_ARGS_CONFIG_SELECTOR_KEY = 'grpc.internal.config_selector';
export interface CallConfig {
methodConfig: MethodConfig;
onCommitted?: () => void;
@ -41,34 +43,27 @@ export interface ConfigSelector {
unref(): void;
}
/**
* A listener object passed to the resolver's constructor that provides name
* resolution updates back to the resolver's owner.
*/
export interface ResolverListener {
/**
* Called whenever the resolver has new name resolution results to report
* @param addressList The new list of backend addresses
* @param serviceConfig The new service configuration corresponding to the
* `addressList`. Will be `null` if no service configuration was
* retrieved or if the service configuration was invalid
* @param serviceConfigError If non-`null`, indicates that the retrieved
* service configuration was invalid
* Called whenever the resolver has new name resolution results or an error to
* report.
* @param endpointList The list of endpoints, or an error if resolution failed
* @param attributes Arbitrary key/value pairs to pass along to load balancing
* policies
* @param serviceConfig The service service config for the endpoint list, or an
* error if the retrieved service config is invalid, or null if there is no
* service config
* @param resolutionNote Provides additional context to RPC failure status
* messages generated by the load balancing policy.
* @returns Whether or not the load balancing policy accepted the result.
*/
onSuccessfulResolution(
addressList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
): void;
/**
* Called whenever a name resolution attempt fails.
* @param error Describes how resolution failed
*/
onError(error: StatusObject): void;
(
endpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown },
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
): boolean;
}
/**
* A resolver class that handles one or more of the name syntax schemes defined
* in the [gRPC Name Resolution document](https://github.com/grpc/grpc/blob/master/doc/naming.md)

View File

@ -27,12 +27,11 @@ import {
validateServiceConfig,
} from './service-config';
import { ConnectivityState } from './connectivity-state';
import { ConfigSelector, createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
import { CHANNEL_ARGS_CONFIG_SELECTOR_KEY, ConfigSelector, createResolver, Resolver } from './resolver';
import { Picker, UnavailablePicker, QueuePicker } from './picker';
import { BackoffOptions, BackoffTimeout } from './backoff-timeout';
import { Status } from './constants';
import { StatusObject } from './call-interface';
import { StatusObject, StatusOr } from './call-interface';
import { Metadata } from './metadata';
import * as logging from './logging';
import { LogVerbosity } from './constants';
@ -251,75 +250,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
);
this.innerResolver = createResolver(
target,
{
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
) => {
this.backoffTimeout.stop();
this.backoffTimeout.reset();
let workingServiceConfig: ServiceConfig | null = null;
/* This first group of conditionals implements the algorithm described
* in https://github.com/grpc/proposal/blob/master/A21-service-config-error-handling.md
* in the section called "Behavior on receiving a new gRPC Config".
*/
if (serviceConfig === null) {
// Step 4 and 5
if (serviceConfigError === null) {
// Step 5
this.previousServiceConfig = null;
workingServiceConfig = this.defaultServiceConfig;
} else {
// Step 4
if (this.previousServiceConfig === null) {
// Step 4.ii
this.handleResolutionFailure(serviceConfigError);
} else {
// Step 4.i
workingServiceConfig = this.previousServiceConfig;
}
}
} else {
// Step 3
workingServiceConfig = serviceConfig;
this.previousServiceConfig = serviceConfig;
}
const workingConfigList =
workingServiceConfig?.loadBalancingConfig ?? [];
const loadBalancingConfig = selectLbConfigFromList(
workingConfigList,
true
);
if (loadBalancingConfig === null) {
// There were load balancing configs but none are supported. This counts as a resolution failure
this.handleResolutionFailure({
code: Status.UNAVAILABLE,
details:
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
configSelector?.unref();
return;
}
this.childLoadBalancer.updateAddressList(
endpointList,
loadBalancingConfig,
{...this.channelOptions, ...attributes}
);
const finalServiceConfig =
workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(
finalServiceConfig,
configSelector ?? getDefaultConfigSelector(finalServiceConfig)
);
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);
},
},
this.handleResolverResult.bind(this),
channelOptions
);
const backoffOptions: BackoffOptions = {
@ -337,6 +268,62 @@ export class ResolvingLoadBalancer implements LoadBalancer {
this.backoffTimeout.unref();
}
private handleResolverResult(
endpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown },
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
): boolean {
this.backoffTimeout.stop();
this.backoffTimeout.reset();
let resultAccepted = true;
let workingServiceConfig: ServiceConfig | null = null;
if (serviceConfig === null) {
workingServiceConfig = this.defaultServiceConfig;
} else if (serviceConfig.ok) {
workingServiceConfig = serviceConfig.value;
} else {
if (this.previousServiceConfig !== null) {
workingServiceConfig = this.previousServiceConfig;
} else {
resultAccepted = false;
this.handleResolutionFailure(serviceConfig.error);
}
}
if (workingServiceConfig !== null) {
const workingConfigList =
workingServiceConfig?.loadBalancingConfig ?? [];
const loadBalancingConfig = selectLbConfigFromList(
workingConfigList,
true
);
if (loadBalancingConfig === null) {
resultAccepted = false;
this.handleResolutionFailure({
code: Status.UNAVAILABLE,
details:
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
} else {
resultAccepted = this.childLoadBalancer.updateAddressList(
endpointList,
loadBalancingConfig,
{...this.channelOptions, ...attributes},
resolutionNote
);
}
}
if (resultAccepted) {
this.onSuccessfulResolution(
workingServiceConfig!,
attributes[CHANNEL_ARGS_CONFIG_SELECTOR_KEY] as ConfigSelector ?? getDefaultConfigSelector(workingServiceConfig!)
);
}
return resultAccepted;
}
private updateResolution() {
this.innerResolver.updateResolution();
if (this.currentState === ConnectivityState.IDLE) {
@ -391,7 +378,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig | null
): never {
throw new Error('updateAddressList not supported on ResolvingLoadBalancer');

View File

@ -782,27 +782,31 @@ export class Server {
private resolvePort(port: GrpcUri): Promise<SubchannelAddress[]> {
return new Promise<SubchannelAddress[]>((resolve, reject) => {
const resolverListener: ResolverListener = {
onSuccessfulResolution: (
endpointList,
serviceConfig,
serviceConfigError
) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {};
const addressList = ([] as SubchannelAddress[]).concat(
...endpointList.map(endpoint => endpoint.addresses)
);
if (addressList.length === 0) {
reject(new Error(`No addresses resolved for port ${port}`));
return;
}
resolve(addressList);
},
onError: error => {
reject(new Error(error.details));
},
};
let seenResolution = false;
const resolverListener: ResolverListener = (
endpointList,
attributes,
serviceConfig,
resolutionNote
) => {
if (seenResolution) {
return true;
}
seenResolution = true;
if (!endpointList.ok) {
reject(new Error(endpointList.error.details));
return true;
}
const addressList = ([] as SubchannelAddress[]).concat(
...endpointList.value.map(endpoint => endpoint.addresses)
);
if (addressList.length === 0) {
reject(new Error(`No addresses resolved for port ${port}`));
return true;
}
resolve(addressList);
return true;
}
const resolver = createResolver(port, resolverListener, this.options);
resolver.updateResolution();
});

View File

@ -31,6 +31,8 @@ import { Metadata } from '../src/metadata';
import { Picker } from '../src/picker';
import { Endpoint, subchannelAddressToString } from '../src/subchannel-address';
import { MockSubchannel, TestClient, TestServer } from './common';
import { statusOrFromError, statusOrFromValue } from '../src/call-interface';
import { Status } from '../src/constants';
function updateStateCallBackForExpectedStateSequence(
expectedStateSequence: ConnectivityState[],
@ -125,9 +127,10 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.READY);
@ -145,12 +148,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.READY);
@ -168,16 +172,17 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{
addresses: [
{ host: 'localhost', port: 1 },
{ host: 'localhost', port: 2 },
],
},
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.READY);
@ -203,9 +208,10 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
});
it('Should stay CONNECTING if only some subchannels fail to connect', done => {
@ -220,12 +226,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
@ -243,12 +250,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
@ -269,12 +277,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
@ -309,12 +318,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
});
it('Should enter READY if a subchannel connects after entering TRANSIENT_FAILURE mode', done => {
@ -337,12 +347,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.READY);
@ -369,22 +380,24 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
currentStartState = ConnectivityState.CONNECTING;
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
});
});
@ -409,19 +422,21 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
currentStartState = ConnectivityState.READY;
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 3 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 3 }] }]),
config,
{}
{},
''
);
});
});
@ -446,9 +461,10 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -475,16 +491,18 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
currentStartState = ConnectivityState.IDLE;
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -512,16 +530,18 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
currentStartState = ConnectivityState.TRANSIENT_FAILURE;
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -549,15 +569,17 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -597,25 +619,28 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 3 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 3 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[2].transitionToState(
@ -660,21 +685,24 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
});
});
@ -704,9 +732,10 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -726,7 +755,20 @@ describe('pick_first load balancing policy', () => {
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList([], config, {});
pickFirst.updateAddressList(statusOrFromValue([]), config, {}, '');
});
it('Should report TRANSIENT_FAILURE with an endpoint list error', done => {
const channelControlHelper = createChildChannelControlHelper(
baseChannelControlHelper,
{
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.TRANSIENT_FAILURE],
done
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(statusOrFromError({code: Status.UNAVAILABLE, details: 'Resolver error'}), config, {}, '');
});
describe('Address list randomization', () => {
const shuffleConfig = new PickFirstLoadBalancingConfig(true);
@ -760,20 +802,21 @@ describe('pick_first load balancing policy', () => {
for (let i = 0; i < 10; i++) {
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
}
const endpointList = statusOrFromValue(endpoints);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
/* Pick from 10 subchannels 5 times, with address randomization enabled,
* and verify that at least two different subchannels are picked. The
* probability choosing the same address every time is 1/10,000, which
* I am considering an acceptable flake rate */
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
assert(pickedSubchannels.size > 1);
done();
@ -816,16 +859,17 @@ describe('pick_first load balancing policy', () => {
for (let i = 0; i < 10; i++) {
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
}
const endpointList = statusOrFromValue(endpoints);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
assert(pickedSubchannels.size === 1);
done();

View File

@ -23,7 +23,7 @@ import * as resolver_dns from '../src/resolver-dns';
import * as resolver_uds from '../src/resolver-uds';
import * as resolver_ip from '../src/resolver-ip';
import { ServiceConfig } from '../src/service-config';
import { StatusObject } from '../src/call-interface';
import { StatusOr } from '../src/call-interface';
import {
Endpoint,
SubchannelAddress,
@ -63,25 +63,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('localhost:50051')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -93,65 +95,71 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('localhost')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it('Should correctly represent an ipv4 address', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('1.2.3.4')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '1.2.3.4', port: 443 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '1.2.3.4', port: 443 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it('Should correctly represent an ipv6 address', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('::1')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -160,22 +168,24 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('[::1]:50051')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -184,20 +194,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('example.com')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(endpointList.length > 0);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(endpointList.length > 0);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -208,23 +220,21 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('grpctest.kleinsch.com')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
if (serviceConfig !== null) {
assert(
serviceConfig.loadBalancingPolicy === 'round_robin',
'Should have found round robin LB policy'
);
done();
}
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (serviceConfig !== null) {
assert(serviceConfig.ok);
assert(
serviceConfig.value.loadBalancingPolicy === 'round_robin',
'Should have found round robin LB policy'
);
done();
}
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -234,21 +244,18 @@ describe('Name Resolver', () => {
parseUri('grpctest.kleinsch.com')!
)!;
let count = 0;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert(
serviceConfig === null,
'Should not have found service config'
);
count++;
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
assert(
serviceConfig === null,
'Should not have found service config'
);
count++;
return true;
};
const resolver = resolverManager.createResolver(target, listener, {
'grpc.service_config_disable_resolution': 1,
@ -271,25 +278,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('loopback4.unittest.grpc.io')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
`None of [${endpointList.map(addr =>
endpointToString(addr)
)}] matched '127.0.0.1:443'`
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
`None of [${endpointList.map(addr =>
endpointToString(addr)
)}] matched '127.0.0.1:443'`
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -300,20 +309,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('loopback6.unittest.grpc.io')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -325,27 +336,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('loopback46.unittest.grpc.io')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
`None of [${endpointList.map(addr =>
endpointToString(addr)
)}] matched '127.0.0.1:443'`
);
/* TODO(murgatroid99): check for IPv6 result, once we can get that
* consistently */
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
`None of [${endpointList.map(addr =>
endpointToString(addr)
)}] matched '127.0.0.1:443'`
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -356,20 +367,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('network-tools.com')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(endpointList.length > 0);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(endpointList.length > 0);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -386,23 +399,23 @@ describe('Name Resolver', () => {
const target2 = resolverManager.mapUriDefaultScheme(
parseUri('grpc-test4.sandbox.googleapis.com')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert(endpointList.length > 0);
completeCount += 1;
if (completeCount === 2) {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
done();
}
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (completeCount >= 2) {
return true;
}
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(endpointList.length > 0);
completeCount += 1;
if (completeCount === 2) {
done();
}
return true;
};
const resolver1 = resolverManager.createResolver(target1, listener, {});
resolver1.updateResolution();
@ -419,26 +432,25 @@ describe('Name Resolver', () => {
let resultCount = 0;
const resolver = resolverManager.createResolver(
target,
{
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 443 })
);
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
},
onError: (error: StatusObject) => {
assert.ifError(error);
},
(
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 443 })
);
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
return true;
},
{ 'grpc.dns_min_time_between_resolutions_ms': 2000 }
);
@ -455,20 +467,18 @@ describe('Name Resolver', () => {
let resultCount = 0;
const resolver = resolverManager.createResolver(
target,
{
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert.fail('Resolution succeeded unexpectedly');
},
onError: (error: StatusObject) => {
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
},
(
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
assert(!maybeEndpointList.ok);
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
return true;
},
{}
);
@ -484,20 +494,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('unix:socket')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { path: 'socket' }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { path: 'socket' }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -506,20 +518,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('unix:///tmp/socket')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { path: '/tmp/socket' }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { path: '/tmp/socket' }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -530,22 +544,24 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv4:127.0.0.1')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -554,22 +570,24 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv4:127.0.0.1:50051')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -578,25 +596,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv4:127.0.0.1:50051,127.0.0.1:50052')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50052 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50052 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -605,20 +625,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv6:::1')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -627,22 +649,24 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv6:[::1]:50051')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -651,25 +675,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv6:[::1]:50051,[::1]:50052')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50052 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50052 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();