Merge branch 'master' into grpc-js-xds_dependency_manager_watcher_updates

This commit is contained in:
Michael Lumish 2025-03-21 12:43:44 -07:00
commit 4a0f4cf5c8
31 changed files with 1182 additions and 844 deletions

View File

@ -41,6 +41,7 @@ import PickResult = grpc.experimental.PickResult;
import PickResultType = grpc.experimental.PickResultType;
import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper;
import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig;
import StatusOr = grpc.experimental.StatusOr;
import { ChannelOptions } from '@grpc/grpc-js';
grpc_xds.register();
@ -100,12 +101,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
return false;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options);
return this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options, resolutionNote);
}
exitIdle(): void {
this.child.exitIdle();

View File

@ -26,6 +26,8 @@ import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import StatusOr = experimental.StatusOr;
import statusOrFromValue = experimental.statusOrFromValue;
import { XdsConfig } from './xds-dependency-manager';
import { LocalityEndpoint, PriorityChildRaw } from './load-balancer-priority';
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
@ -205,7 +207,7 @@ function getLeafClusters(xdsConfig: XdsConfig, rootCluster: string, depth = 0):
if (!maybeClusterConfig) {
return [];
}
if (!maybeClusterConfig.success) {
if (!maybeClusterConfig.ok) {
return [rootCluster];
}
if (maybeClusterConfig.value.children.type === 'aggregate') {
@ -240,13 +242,14 @@ export class CdsLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof CdsLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
return false;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
@ -254,12 +257,12 @@ export class CdsLoadBalancer implements LoadBalancer {
const maybeClusterConfig = xdsConfig.clusters.get(clusterName);
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + clusterName);
return;
return false;
}
if (!maybeClusterConfig.success) {
if (!maybeClusterConfig.ok) {
this.childBalancer.destroy();
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error), maybeClusterConfig.error.details);
return;
return true;
}
const clusterConfig = maybeClusterConfig.value;
@ -270,8 +273,8 @@ export class CdsLoadBalancer implements LoadBalancer {
} catch (e) {
trace('xDS config parsing failed with error ' + (e as Error).message);
const errorMessage = `xDS config parsing failed with error ${(e as Error).message}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `${errorMessage} Resolution note: ${resolutionNote}`}), errorMessage);
return true;
}
const priorityChildren: {[name: string]: PriorityChildRaw} = {};
for (const cluster of leafClusters) {
@ -296,16 +299,16 @@ export class CdsLoadBalancer implements LoadBalancer {
} catch (e) {
trace('LB policy config parsing failed with error ' + (e as Error).message);
const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `${errorMessage} Resolution note: ${resolutionNote}`}), errorMessage);
return true;
}
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName});
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName}, resolutionNote);
} else {
if (!clusterConfig.children.endpoints) {
trace('Received update with no resolved endpoints for cluster ' + clusterName);
const errorMessage = `Cluster ${clusterName} resolution failed: ${clusterConfig.children.resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
const newPriorityNames: string[] = [];
const newLocalityPriorities = new Map<string, number>();
@ -317,7 +320,7 @@ export class CdsLoadBalancer implements LoadBalancer {
if (AGGREGATE_CLUSTER_BACKWARDS_COMPAT) {
if (typeof options[ROOT_CLUSTER_KEY] === 'string') {
const maybeRootClusterConfig = xdsConfig.clusters.get(options[ROOT_CLUSTER_KEY]);
if (maybeRootClusterConfig?.success) {
if (maybeRootClusterConfig?.ok) {
endpointPickingPolicy = maybeRootClusterConfig.value.cluster.lbPolicyConfig;
}
}
@ -409,9 +412,9 @@ export class CdsLoadBalancer implements LoadBalancer {
typedChildConfig = parseLoadBalancingConfig(childConfig);
} catch (e) {
trace('LB policy config parsing failed with error ' + (e as Error).message);
const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}`;
const errorMessage = `LB policy config parsing failed with error ${(e as Error).message}. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
const childOptions: ChannelOptions = {...options};
if (clusterConfig.cluster.securityUpdate) {
@ -419,16 +422,16 @@ export class CdsLoadBalancer implements LoadBalancer {
const xdsClient = options[XDS_CLIENT_KEY] as XdsClient;
const caCertProvider = xdsClient.getCertificateProvider(securityUpdate.caCertificateProviderInstance);
if (!caCertProvider) {
const errorMessage = `Cluster ${clusterName} configured with CA certificate provider ${securityUpdate.caCertificateProviderInstance} not in bootstrap`;
const errorMessage = `Cluster ${clusterName} configured with CA certificate provider ${securityUpdate.caCertificateProviderInstance} not in bootstrap. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
if (securityUpdate.identityCertificateProviderInstance) {
const identityCertProvider = xdsClient.getCertificateProvider(securityUpdate.identityCertificateProviderInstance);
if (!identityCertProvider) {
const errorMessage = `Cluster ${clusterName} configured with identity certificate provider ${securityUpdate.identityCertificateProviderInstance} not in bootstrap`;
const errorMessage = `Cluster ${clusterName} configured with identity certificate provider ${securityUpdate.identityCertificateProviderInstance} not in bootstrap. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return;
return false;
}
childOptions[IDENTITY_CERT_PROVIDER_KEY] = identityCertProvider;
}
@ -440,8 +443,9 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Configured subject alternative name matcher: ' + sanMatcher);
childOptions[SAN_MATCHER_KEY] = this.latestSanMatcher;
}
this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, childOptions);
this.childBalancer.updateAddressList(statusOrFromValue(childEndpointList), typedChildConfig, childOptions, resolutionNote);
}
return true;
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -27,6 +27,8 @@ import QueuePicker = experimental.QueuePicker;
import UnavailablePicker = experimental.UnavailablePicker;
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import StatusOr = experimental.StatusOr;
import statusOrFromValue = experimental.statusOrFromValue;
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
const TRACER_NAME = 'priority';
@ -155,9 +157,10 @@ class PriorityLoadBalancingConfig implements TypedLoadBalancingConfig {
interface PriorityChildBalancer {
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
attributes: { [key: string]: unknown },
resolutionNote: string
): void;
exitIdle(): void;
resetBackoff(): void;
@ -240,11 +243,12 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
attributes: { [key: string]: unknown },
resolutionNote: string
): void {
this.childBalancer.updateAddressList(endpointList, lbConfig, attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig, attributes, resolutionNote);
}
exitIdle() {
@ -332,6 +336,8 @@ export class PriorityLoadBalancer implements LoadBalancer {
private updatesPaused = false;
private latestResolutionNote: string = '';
constructor(private channelControlHelper: ChannelControlHelper) {}
private updateState(state: ConnectivityState, picker: Picker, errorMessage: string | null) {
@ -401,9 +407,10 @@ export class PriorityLoadBalancer implements LoadBalancer {
child = new this.PriorityChildImpl(this, childName, childUpdate.ignoreReresolutionRequests);
this.children.set(childName, child);
child.updateAddressList(
childUpdate.subchannelAddress,
statusOrFromValue(childUpdate.subchannelAddress),
childUpdate.lbConfig,
this.latestOptions
this.latestOptions,
this.latestResolutionNote
);
} else {
/* We're going to try to use this child, so reactivate it if it has been
@ -440,14 +447,21 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof PriorityLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
if (!endpointList.ok) {
if (this.latestUpdates.size === 0) {
this.updateState(ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker(endpointList.error), endpointList.error.details);
}
return true;
}
/* For each address, the first element of its localityPath array determines
* which child it belongs to. So we bucket those addresses by that first
@ -457,14 +471,14 @@ export class PriorityLoadBalancer implements LoadBalancer {
string,
LocalityEndpoint[]
>();
for (const endpoint of endpointList) {
for (const endpoint of endpointList.value) {
if (!isLocalityEndpoint(endpoint)) {
// Reject address that cannot be prioritized
return;
return false;
}
if (endpoint.localityPath.length < 1) {
// Reject address that cannot be prioritized
return;
return false;
}
const childName = endpoint.localityPath[0];
const childAddress: LocalityEndpoint = {
@ -495,9 +509,10 @@ export class PriorityLoadBalancer implements LoadBalancer {
const existingChild = this.children.get(childName);
if (existingChild !== undefined) {
existingChild.updateAddressList(
childAddresses,
statusOrFromValue(childAddresses),
childConfig.config,
options
options,
resolutionNote
);
}
}
@ -509,7 +524,9 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
}
this.updatesPaused = false;
this.latestResolutionNote = resolutionNote;
this.choosePriority();
return true;
}
exitIdle(): void {
if (this.currentPriority !== null) {

View File

@ -31,6 +31,7 @@ import UnavailablePicker = experimental.UnavailablePicker;
import subchannelAddressToString = experimental.subchannelAddressToString;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import EndpointMap = experimental.EndpointMap;
import StatusOr = experimental.StatusOr;
import { loadXxhashApi, xxhashApi } from './xxhash';
import { EXPERIMENTAL_RING_HASH } from './environment';
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
@ -401,26 +402,44 @@ class RingHashLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof RingHashLoadBalancingConfig)) {
trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
if (!endpointList.ok) {
if (this.ring.length === 0) {
this.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(endpointList.error), endpointList.error.details);
}
return true;
}
if (endpointList.value.length === 0) {
for (const ringEntry of this.ring) {
ringEntry.leafBalancer.destroy();
}
this.ring = [];
this.leafMap.clear();
this.leafWeightMap.clear();
const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`;
this.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return false;
}
trace('Received update with config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
this.updatesPaused = true;
this.leafWeightMap.clear();
const dedupedEndpointList: Endpoint[] = [];
for (const endpoint of endpointList) {
for (const endpoint of endpointList.value) {
const leafBalancer = this.leafMap.get(endpoint);
if (leafBalancer) {
leafBalancer.updateEndpoint(endpoint, options);
} else {
this.leafMap.set(
endpoint,
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options)
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options, resolutionNote)
);
}
const weight = this.leafWeightMap.get(endpoint);
@ -429,7 +448,7 @@ class RingHashLoadBalancer implements LoadBalancer {
}
this.leafWeightMap.set(endpoint, (weight ?? 0) + (isLocalityEndpoint(endpoint) ? endpoint.endpointWeight : 1));
}
const removedLeaves = this.leafMap.deleteMissing(endpointList);
const removedLeaves = this.leafMap.deleteMissing(endpointList.value);
for (const leaf of removedLeaves) {
leaf.destroy();
}
@ -440,6 +459,7 @@ class RingHashLoadBalancer implements LoadBalancer {
this.calculateAndUpdateState();
this.maybeProactivelyConnect();
});
return true;
}
exitIdle(): void {
/* This operation does not make sense here. We don't want to make the whole

View File

@ -15,7 +15,7 @@
*
*/
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig, ChannelOptions } from "@grpc/grpc-js";
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig, ChannelOptions, connectivityState, status } from "@grpc/grpc-js";
import { isLocalityEndpoint, LocalityEndpoint } from "./load-balancer-priority";
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import LoadBalancer = experimental.LoadBalancer;
@ -30,6 +30,8 @@ import UnavailablePicker = experimental.UnavailablePicker;
import Endpoint = experimental.Endpoint;
import endpointToString = experimental.endpointToString;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import StatusOr = experimental.StatusOr;
import statusOrFromValue = experimental.statusOrFromValue;
const TRACER_NAME = 'weighted_target';
@ -154,7 +156,7 @@ class WeightedTargetPicker implements Picker {
}
interface WeightedChild {
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void;
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: WeightedTarget, options: ChannelOptions, resolutionNote: string): void;
exitIdle(): void;
resetBackoff(): void;
destroy(): void;
@ -193,9 +195,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.parent.maybeUpdateState();
}
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: WeightedTarget, options: ChannelOptions, resolutionNote: string): void {
this.weight = lbConfig.weight;
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options);
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options, resolutionNote);
}
exitIdle(): void {
this.childBalancer.exitIdle();
@ -325,26 +327,41 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, picker, errorMessage);
}
updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(addressList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
if (!addressList.ok) {
if (this.targets.size === 0) {
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(addressList.error), addressList.error.details);
}
return true;
}
if (addressList.value.length === 0) {
for (const target of this.targets.values()) {
target.destroy();
}
this.targets.clear();
this.targetList = [];
const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`;
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage}), errorMessage);
return false;
}
/* For each address, the first element of its localityPath array determines
* which child it belongs to. So we bucket those addresses by that first
* element, and pass along the rest of the localityPath for that child
* to use. */
const childEndpointMap = new Map<string, LocalityEndpoint[]>();
for (const address of addressList) {
for (const address of addressList.value) {
if (!isLocalityEndpoint(address)) {
// Reject address that cannot be associated with targets
return;
return false;
}
if (address.localityPath.length < 1) {
// Reject address that cannot be associated with targets
return;
return false;
}
const childName = address.localityPath[0];
const childAddress: LocalityEndpoint = {
@ -371,7 +388,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
}
const targetEndpoints = childEndpointMap.get(targetName) ?? [];
trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'));
target.updateAddressList(targetEndpoints, targetConfig, options);
target.updateAddressList(statusOrFromValue(targetEndpoints), targetConfig, options, resolutionNote);
}
// Deactivate targets that are not in the new config
@ -384,6 +401,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.updatesPaused = false;
this.updateState();
return true;
}
exitIdle(): void {
for (const targetName of this.targetList) {

View File

@ -37,6 +37,7 @@ import selectLbConfigFromList = experimental.selectLbConfigFromList;
import SubchannelInterface = experimental.SubchannelInterface;
import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper;
import UnavailablePicker = experimental.UnavailablePicker;
import StatusOr = experimental.StatusOr;
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
import { ClusterConfig, XdsConfig } from "./xds-dependency-manager";
import { CdsUpdate } from "./xds-resource-type/cluster-resource-type";
@ -206,7 +207,7 @@ function getCallCounterMapKey(cluster: string, edsServiceName?: string): string
class XdsClusterImplBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private lastestEndpointList: Endpoint[] | null = null;
private lastestEndpointList: StatusOr<Endpoint[]> | null = null;
private latestConfig: XdsClusterImplLoadBalancingConfig | null = null;
private clusterDropStats: XdsClusterDropStats | null = null;
private xdsClient: XdsClient | null = null;
@ -215,12 +216,12 @@ class XdsClusterImplBalancer implements LoadBalancer {
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress, subchannelArgs) => {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) {
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated');
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.lastestEndpointList.ok || !this.latestClusterConfig) {
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated or with resolver error');
}
const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
let locality: Locality__Output | null = null;
for (const endpoint of this.lastestEndpointList) {
for (const endpoint of this.lastestEndpointList.value) {
if (endpointHasAddress(endpoint, subchannelAddress)) {
locality = (endpoint as LocalityEndpoint).locality;
}
@ -251,28 +252,28 @@ class XdsClusterImplBalancer implements LoadBalancer {
}
}));
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster());
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + lbConfig.getCluster());
return;
return false;
}
if (!maybeClusterConfig.success) {
if (!maybeClusterConfig.ok) {
this.latestClusterConfig = null;
this.childBalancer.destroy();
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error), maybeClusterConfig.error.details);
return;
return false;
}
const clusterConfig = maybeClusterConfig.value;
if (clusterConfig.children.type === 'aggregate') {
trace('Received update for aggregate cluster ' + lbConfig.getCluster());
return;
return false;
}
if (!clusterConfig.children.endpoints) {
this.childBalancer.destroy();
@ -291,7 +292,8 @@ class XdsClusterImplBalancer implements LoadBalancer {
);
}
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options);
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options, resolutionNote);
return true;
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -30,6 +30,7 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import ChannelControlHelper = experimental.ChannelControlHelper;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import StatusOr = experimental.StatusOr;
const TRACER_NAME = 'xds_cluster_manager';
@ -111,7 +112,7 @@ class XdsClusterManagerPicker implements Picker {
}
interface XdsClusterManagerChild {
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void;
updateAddressList(endpointList: StatusOr<Endpoint[]>, childConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): void;
exitIdle(): void;
resetBackoff(): void;
destroy(): void;
@ -145,8 +146,8 @@ class XdsClusterManager implements LoadBalancer {
}
this.parent.maybeUpdateState();
}
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
this.childBalancer.updateAddressList(endpointList, childConfig, options);
updateAddressList(endpointList: StatusOr<Endpoint[]>, childConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): void {
this.childBalancer.updateAddressList(endpointList, childConfig, options, resolutionNote);
}
exitIdle(): void {
this.childBalancer.exitIdle();
@ -213,11 +214,11 @@ class XdsClusterManager implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap), errorMessage);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
return false;;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const configChildren = lbConfig.getChildren();
@ -240,10 +241,11 @@ class XdsClusterManager implements LoadBalancer {
child = new this.XdsClusterManagerChildImpl(this, name);
this.children.set(name, child);
}
child.updateAddressList(endpointList, childConfig, options);
child.updateAddressList(endpointList, childConfig, options, resolutionNote);
}
this.updatesPaused = false;
this.updateState();
return true;
}
exitIdle(): void {
for (const child of this.children.values()) {

View File

@ -29,6 +29,7 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import Endpoint = experimental.Endpoint;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import StatusOr = experimental.StatusOr;
import { Any__Output } from "./generated/google/protobuf/Any";
import { WrrLocality__Output } from "./generated/envoy/extensions/load_balancing_policies/wrr_locality/v3/WrrLocality";
import { TypedExtensionConfig__Output } from "./generated/envoy/config/core/v3/TypedExtensionConfig";
@ -76,15 +77,19 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof XdsWrrLocalityLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
return false;
}
if (!endpointList.ok) {
this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig({weighted_target: { targets: [] }}), options, resolutionNote);
return true;
}
const targets: {[localityName: string]: WeightedTargetRaw} = {};
for (const address of endpointList) {
for (const address of endpointList.value) {
if (!isLocalityEndpoint(address)) {
return;
return false;
}
const localityName = localityToName(address.locality);
if (!(localityName in targets)) {
@ -99,7 +104,8 @@ class XdsWrrLocalityLoadBalancer implements LoadBalancer {
targets: targets
}
};
this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), options);
this.childBalancer.updateAddressList(endpointList, parseLoadBalancingConfig(childConfig), options, resolutionNote);
return true;
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -41,6 +41,9 @@ import { loadXxhashApi } from './xxhash';
import { formatTemplateString } from './xds-bootstrap';
import { getPredicateForMatcher } from './route';
import { XdsConfig, XdsConfigWatcher, XdsDependencyManager } from './xds-dependency-manager';
import statusOrFromValue = experimental.statusOrFromValue;
import statusOrFromError = experimental.statusOrFromError;
import CHANNEL_ARGS_CONFIG_SELECTOR_KEY = experimental.CHANNEL_ARGS_CONFIG_SELECTOR_KEY;
const TRACER_NAME = 'xds_resolver';
@ -133,11 +136,10 @@ class XdsResolver implements Resolver {
}
this.xdsConfigWatcher = {
onUpdate: maybeXdsConfig => {
if (maybeXdsConfig.success) {
if (maybeXdsConfig.ok) {
this.handleXdsConfig(maybeXdsConfig.value);
} else {
// This should be in the resolution_note once that is implemented
this.reportResolutionError(`Resolution error for target ${uriToString(this.target)}: ${maybeXdsConfig.error.details}`);
this.listener(statusOrFromValue([]), {}, null, `Resolution error for target ${uriToString(this.target)}: ${maybeXdsConfig.error.details}`);
}
}
}
@ -402,20 +404,20 @@ class XdsResolver implements Resolver {
methodConfig: [],
loadBalancingConfig: [lbPolicyConfig]
}
this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {
this.listener(statusOrFromValue([]), {
[XDS_CLIENT_KEY]: this.xdsClient,
[XDS_CONFIG_KEY]: xdsConfig
});
[XDS_CONFIG_KEY]: xdsConfig,
[CHANNEL_ARGS_CONFIG_SELECTOR_KEY]: configSelector
}, statusOrFromValue(serviceConfig), '');
}
private reportResolutionError(reason: string) {
this.listener.onError({
this.listener(statusOrFromError({
code: status.UNAVAILABLE,
details: `xDS name resolution failed for target ${uriToString(
this.target
)}: ${reason}`,
metadata: new Metadata(),
});
)}: ${reason}`
}), {}, null, '');
}
private startResolution(): void {

View File

@ -25,6 +25,7 @@ import { DropCategory } from "./load-balancer-xds-cluster-impl";
import Endpoint = experimental.Endpoint;
import Resolver = experimental.Resolver;
import createResolver = experimental.createResolver;
import StatusOr = experimental.StatusOr;
import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from "./resources";
import { RouteConfigurationResourceType } from "./xds-resource-type/route-config-resource-type";
import { ListenerResourceType } from "./xds-resource-type/listener-resource-type";
@ -75,14 +76,6 @@ export interface ClusterConfig {
children: EndpointConfig | AggregateConfig;
}
export type StatusOr<T> = {
success: true;
value: T
} | {
success: false;
error: StatusObject;
}
export interface ClusterResult {
clusterConfig?: ClusterConfig;
status?: StatusObject;
@ -157,7 +150,7 @@ function isClusterTreeFullyUpdated(tree: ClusterGraph, roots: string[]): Cluster
reason: 'Cluster entry ' + next + ' not updated'
};
}
if (tree[next].latestUpdate.success) {
if (tree[next].latestUpdate.ok) {
if (tree[next].latestUpdate.value.type !== 'AGGREGATE') {
if (!(tree[next].latestUpdate.value.latestUpdate)) {
return {
@ -400,7 +393,7 @@ export class XdsDependencyManager {
if (!this.latestListener) {
this.trace('Resolution error due to xDS client transient error ' + error.details);
this.watcher.onUpdate({
success: false,
ok: false,
error: {
...error,
details: `Listener ${listenerResourceName}: ${error.details}`
@ -420,7 +413,7 @@ export class XdsDependencyManager {
onError: (error: StatusObject) => {
if (!this.latestRouteConfiguration) {
this.watcher.onUpdate({
success: false,
ok: false,
error: {
...error,
details: `RouteConfiguration ${this.latestRouteConfigName}: ${error.details}`
@ -430,7 +423,7 @@ export class XdsDependencyManager {
},
onResourceDoesNotExist: () => {
this.watcher.onUpdate({
success: false,
ok: false,
error: {
code: status.UNAVAILABLE,
details: `RouteConfiguration ${this.latestRouteConfigName} does not exist`,
@ -459,7 +452,7 @@ export class XdsDependencyManager {
this.pruneOrphanClusters();
}
this.watcher.onUpdate({
success: false,
ok: false,
error: {
code: status.UNAVAILABLE,
details: `Listener ${this.listenerResourceName} does not exist`,
@ -493,7 +486,7 @@ export class XdsDependencyManager {
this.trace('Not sending update: Cluster entry ' + clusterName + ' not updated (not caught by isClusterTreeFullyUpdated)');
return;
}
if (entry.latestUpdate.success) {
if (entry.latestUpdate.ok) {
let clusterChildren: EndpointConfig | AggregateConfig;
if (entry.latestUpdate.value.type === 'AGGREGATE') {
clusterChildren = {
@ -508,7 +501,7 @@ export class XdsDependencyManager {
};
}
update.clusters.set(clusterName, {
success: true,
ok: true,
value: {
cluster: entry.latestUpdate.value.cdsUpdate,
children: clusterChildren
@ -516,12 +509,12 @@ export class XdsDependencyManager {
});
} else {
update.clusters.set(clusterName, {
success: false,
ok: false,
error: entry.latestUpdate.error
});
}
}
this.watcher.onUpdate({success: true, value: update});
this.watcher.onUpdate({ok: true, value: update});
}
private addCluster(clusterName: string) {
@ -533,7 +526,7 @@ export class XdsDependencyManager {
onResourceChanged: (update: CdsUpdate) => {
switch (update.type) {
case 'AGGREGATE':
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'AGGREGATE':
break;
@ -548,7 +541,7 @@ export class XdsDependencyManager {
}
entry.children = update.aggregateChildren;
entry.latestUpdate = {
success: true,
ok: true,
value: {
type: 'AGGREGATE',
cdsUpdate: update
@ -562,7 +555,7 @@ export class XdsDependencyManager {
break;
case 'EDS':
const edsServiceName = update.edsServiceName ?? clusterName;
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'AGGREGATE':
entry.children = [];
@ -589,14 +582,14 @@ export class XdsDependencyManager {
}
const edsWatcher = new Watcher<ClusterLoadAssignment__Output>({
onResourceChanged: (endpoint: ClusterLoadAssignment__Output) => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'EDS') {
entry.latestUpdate.value.latestUpdate = getEdsResource(endpoint);
entry.latestUpdate.value.resolutionNote = undefined;
this.maybeSendUpdate();
}
},
onError: error => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'EDS') {
if (!entry.latestUpdate.value.latestUpdate) {
entry.latestUpdate.value.resolutionNote = `Control plane error: ${error.details}`;
this.maybeSendUpdate();
@ -604,7 +597,7 @@ export class XdsDependencyManager {
}
},
onResourceDoesNotExist: () => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'EDS') {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'EDS') {
entry.latestUpdate.value.resolutionNote = 'Resource does not exist';
entry.latestUpdate.value.latestUpdate = undefined;
this.maybeSendUpdate();
@ -612,7 +605,7 @@ export class XdsDependencyManager {
}
});
entry.latestUpdate = {
success: true,
ok: true,
value: {
type: 'EDS',
cdsUpdate: update,
@ -625,7 +618,7 @@ export class XdsDependencyManager {
this.maybeSendUpdate();
break;
case 'LOGICAL_DNS': {
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'AGGREGATE':
entry.children = [];
@ -644,24 +637,24 @@ export class XdsDependencyManager {
}
}
this.trace('Creating DNS resolver for hostname ' + update.dnsHostname!);
const resolver = createResolver({scheme: 'dns', path: update.dnsHostname!}, {
onSuccessfulResolution: endpointList => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'LOGICAL_DNS') {
entry.latestUpdate.value.latestUpdate = getDnsResource(endpointList);
const resolver = createResolver({scheme: 'dns', path: update.dnsHostname!}, endpointList => {
if (endpointList.ok) {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'LOGICAL_DNS') {
entry.latestUpdate.value.latestUpdate = getDnsResource(endpointList.value);
this.maybeSendUpdate();
}
},
onError: error => {
if (entry.latestUpdate?.success && entry.latestUpdate.value.type === 'LOGICAL_DNS') {
} else {
if (entry.latestUpdate?.ok && entry.latestUpdate.value.type === 'LOGICAL_DNS') {
if (!entry.latestUpdate.value.latestUpdate) {
entry.latestUpdate.value.resolutionNote = `DNS resolution error: ${error.details}`;
entry.latestUpdate.value.resolutionNote = `DNS resolution error: ${endpointList.error.details}`;
this.maybeSendUpdate();
}
}
}
return true;
}, {'grpc.service_config_disable_resolution': 1});
entry.latestUpdate = {
success: true,
ok: true,
value: {
type: 'LOGICAL_DNS',
cdsUpdate: update,
@ -676,16 +669,16 @@ export class XdsDependencyManager {
}
},
onError: error => {
if (!entry.latestUpdate?.success) {
if (!entry.latestUpdate?.ok) {
entry.latestUpdate = {
success: false,
ok: false,
error: error
};
this.maybeSendUpdate();
}
},
onResourceDoesNotExist: () => {
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'EDS':
this.trace('EDS.cancelWatch(' + entry.latestUpdate.value.edsServiceName + '): CDS resource does not exist');
@ -699,7 +692,7 @@ export class XdsDependencyManager {
}
}
entry.latestUpdate = {
success: false,
ok: false,
error: {
code: status.UNAVAILABLE,
details: `Cluster resource ${clusterName} does not exist`,
@ -741,7 +734,7 @@ export class XdsDependencyManager {
return;
}
const entry = this.clusterForest[clusterName];
if (entry.latestUpdate?.success) {
if (entry.latestUpdate?.ok) {
switch (entry.latestUpdate.value.type) {
case 'EDS':
this.trace('EDS.cancelWatch(' + entry.latestUpdate.value.edsServiceName + '): Cluster ' + clusterName + ' removed');
@ -794,7 +787,7 @@ export class XdsDependencyManager {
this.clusterRoots = [];
this.pruneOrphanClusters();
this.watcher.onUpdate({
success: false,
ok: false,
error: {
code: status.UNAVAILABLE,
details: `RouteConfiguration ${routeConfig.name}: No matching route found for ${this.dataPlaneAuthority}`,
@ -832,7 +825,7 @@ export class XdsDependencyManager {
updateResolution() {
for (const clusterEntry of Object.values(this.clusterForest)) {
if (clusterEntry.latestUpdate?.success && clusterEntry.latestUpdate.value.type === 'LOGICAL_DNS') {
if (clusterEntry.latestUpdate?.ok && clusterEntry.latestUpdate.value.type === 'LOGICAL_DNS') {
clusterEntry.latestUpdate.value.resolver.updateResolution();
}
}

View File

@ -38,6 +38,7 @@ import PickResultType = experimental.PickResultType;
import createChildChannelControlHelper = experimental.createChildChannelControlHelper;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import StatusOr = experimental.StatusOr;
import { PickFirst } from "../src/generated/envoy/extensions/load_balancing_policies/pick_first/v3/PickFirst";
const LB_POLICY_NAME = 'test.RpcBehaviorLoadBalancer';
@ -95,12 +96,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
updateAddressList(endpointList: StatusOr<Endpoint[]>, lbConfig: TypedLoadBalancingConfig, options: ChannelOptions, resolutionNote: string): boolean {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
return false;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options);
return this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options, resolutionNote);
}
exitIdle(): void {
this.child.exitIdle();

View File

@ -41,6 +41,35 @@ export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
metadata?: Metadata | null | undefined;
};
export interface StatusOrOk<T> {
ok: true;
value: T;
}
export interface StatusOrError {
ok: false;
error: StatusObject;
}
export type StatusOr<T> = StatusOrOk<T> | StatusOrError;
export function statusOrFromValue<T>(value: T): StatusOr<T> {
return {
ok: true,
value: value
};
}
export function statusOrFromError<T>(error: PartialStatusObject): StatusOr<T> {
return {
ok: false,
error: {
...error,
metadata: error.metadata ?? new Metadata()
}
};
}
export const enum WriteFlags {
BufferHint = 1,
NoCompress = 2,

View File

@ -5,6 +5,7 @@ export {
registerResolver,
ConfigSelector,
createResolver,
CHANNEL_ARGS_CONFIG_SELECTOR_KEY,
} from './resolver';
export { GrpcUri, uriToString, splitHostPort, HostPort } from './uri-parser';
export { Duration, durationToMs, parseDuration } from './duration';
@ -37,7 +38,12 @@ export {
PickArgs,
PickResultType,
} from './picker';
export { Call as CallStream } from './call-interface';
export {
Call as CallStream,
StatusOr,
statusOrFromValue,
statusOrFromError
} from './call-interface';
export { Filter, BaseFilter, FilterFactory } from './filter';
export { FilterStackFactory } from './filter-stack';
export { registerAdminService } from './admin';

View File

@ -27,6 +27,7 @@ import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker';
import type { ChannelRef, SubchannelRef } from './channelz';
import { SubchannelInterface } from './subchannel-interface';
import { StatusOr } from './call-interface';
const TYPE_NAME = 'child_load_balancer_helper';
@ -102,10 +103,11 @@ export class ChildLoadBalancerHandler {
* @param attributes
*/
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
let childToUpdate: LoadBalancer;
if (
this.currentChild === null ||
@ -133,7 +135,7 @@ export class ChildLoadBalancerHandler {
}
}
this.latestConfig = lbConfig;
childToUpdate.updateAddressList(endpointList, lbConfig, options);
return childToUpdate.updateAddressList(endpointList, lbConfig, options, resolutionNote);
}
exitIdle(): void {
if (this.currentChild) {

View File

@ -43,6 +43,7 @@ import {
} from './subchannel-interface';
import * as logging from './logging';
import { LoadBalancingConfig } from './service-config';
import { StatusOr } from './call-interface';
const TRACER_NAME = 'outlier_detection';
@ -757,28 +758,31 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) {
return;
return false;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2))
for (const endpoint of endpointList) {
if (!this.entryMap.has(endpoint)) {
trace('Adding map entry for ' + endpointToString(endpoint));
this.entryMap.set(endpoint, {
counter: new CallCounter(),
currentEjectionTimestamp: null,
ejectionTimeMultiplier: 0,
subchannelWrappers: [],
});
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
if (endpointList.ok) {
for (const endpoint of endpointList.value) {
if (!this.entryMap.has(endpoint)) {
trace('Adding map entry for ' + endpointToString(endpoint));
this.entryMap.set(endpoint, {
counter: new CallCounter(),
currentEjectionTimestamp: null,
ejectionTimeMultiplier: 0,
subchannelWrappers: [],
});
}
}
this.entryMap.deleteMissing(endpointList.value);
}
this.entryMap.deleteMissing(endpointList);
const childPolicy = lbConfig.getChildPolicy();
this.childBalancer.updateAddressList(endpointList, childPolicy, options);
this.childBalancer.updateAddressList(endpointList, childPolicy, options, resolutionNote);
if (
lbConfig.getSuccessRateEjectionConfig() ||
@ -808,6 +812,7 @@ export class OutlierDetectionLoadBalancer implements LoadBalancer {
}
this.latestConfig = lbConfig;
return true;
}
exitIdle(): void {
this.childBalancer.exitIdle();

View File

@ -43,6 +43,7 @@ import {
import { isTcpSubchannelAddress } from './subchannel-address';
import { isIPv6 } from 'net';
import { ChannelOptions } from './channel-options';
import { StatusOr, statusOrFromValue } from './call-interface';
const TRACER_NAME = 'pick_first';
@ -236,6 +237,8 @@ export class PickFirstLoadBalancer implements LoadBalancer {
private latestOptions: ChannelOptions = {};
private latestResolutionNote: string = '';
/**
* Load balancer that attempts to connect to each backend in the address list
* in order, and picks the first one that connects, using it for every
@ -277,7 +280,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
);
}
} else if (this.latestAddressList?.length === 0) {
const errorMessage = `No connection established. Last error: ${this.lastError}`;
const errorMessage = `No connection established. Last error: ${this.lastError}. Resolution note: ${this.latestResolutionNote}`;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
@ -289,7 +292,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this), null);
} else {
if (this.stickyTransientFailureMode) {
const errorMessage = `No connection established. Last error: ${this.lastError}`;
const errorMessage = `No connection established. Last error: ${this.lastError}. Resolution note: ${this.latestResolutionNote}`;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
@ -505,13 +508,25 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
maybeEndpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {
return;
return false;
}
if (!maybeEndpointList.ok) {
if (this.children.length === 0 && this.currentPick === null) {
this.channelControlHelper.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker(maybeEndpointList.error),
maybeEndpointList.error.details
);
}
return true;
}
let endpointList = maybeEndpointList.value;
this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME];
/* Previously, an update would be discarded if it was identical to the
* previous update, to minimize churn. Now the DNS resolver is
@ -523,13 +538,17 @@ export class PickFirstLoadBalancer implements LoadBalancer {
...endpointList.map(endpoint => endpoint.addresses)
);
trace('updateAddressList([' + rawAddressList.map(address => subchannelAddressToString(address)) + '])');
if (rawAddressList.length === 0) {
this.lastError = 'No addresses resolved';
}
const addressList = interleaveAddressFamilies(rawAddressList);
this.latestAddressList = addressList;
this.latestOptions = options;
this.connectToAddressList(addressList, options);
this.latestResolutionNote = resolutionNote;
if (rawAddressList.length > 0) {
return true;
} else {
this.lastError = 'No addresses resolved';
return false;
}
}
exitIdle() {
@ -570,7 +589,8 @@ export class LeafLoadBalancer {
constructor(
private endpoint: Endpoint,
channelControlHelper: ChannelControlHelper,
private options: ChannelOptions
private options: ChannelOptions,
private resolutionNote: string
) {
const childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
@ -590,9 +610,10 @@ export class LeafLoadBalancer {
startConnecting() {
this.pickFirstBalancer.updateAddressList(
[this.endpoint],
statusOrFromValue([this.endpoint]),
LEAF_CONFIG,
{ ...this.options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true }
{ ...this.options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true },
this.resolutionNote
);
}

View File

@ -39,6 +39,7 @@ import {
} from './subchannel-address';
import { LeafLoadBalancer } from './load-balancer-pick-first';
import { ChannelOptions } from './channel-options';
import { StatusOr } from './call-interface';
const TRACER_NAME = 'round_robin';
@ -205,14 +206,38 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
for (const child of this.children) {
child.destroy();
}
this.children = [];
}
updateAddressList(
endpointList: Endpoint[],
maybeEndpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
options: ChannelOptions
): void {
options: ChannelOptions,
resolutionNote: string
): boolean {
if (!(lbConfig instanceof RoundRobinLoadBalancingConfig)) {
return false;
}
if (!maybeEndpointList.ok) {
if (this.children.length === 0) {
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker(maybeEndpointList.error),
maybeEndpointList.error.details
);
}
return true;
}
const endpointList = maybeEndpointList.value;
this.resetSubchannelList();
if (endpointList.length === 0) {
const errorMessage = `No addresses resolved. Resolution note: ${resolutionNote}`;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({details: errorMessage}),
errorMessage
);
}
trace('Connect to endpoint list ' + endpointList.map(endpointToString));
this.updatesPaused = true;
this.children = endpointList.map(
@ -220,7 +245,8 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
new LeafLoadBalancer(
endpoint,
this.childChannelControlHelper,
options
options,
resolutionNote
)
);
for (const child of this.children) {
@ -228,6 +254,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
}
this.updatesPaused = false;
this.calculateAndUpdateState();
return true;
}
exitIdle(): void {

View File

@ -24,6 +24,7 @@ import { SubchannelInterface } from './subchannel-interface';
import { LoadBalancingConfig } from './service-config';
import { log } from './logging';
import { LogVerbosity } from './constants';
import { StatusOr } from './call-interface';
/**
* A collection of functions associated with a channel that a load balancer
@ -102,12 +103,16 @@ export interface LoadBalancer {
* @param endpointList The new list of addresses to connect to
* @param lbConfig The load balancing config object from the service config,
* if one was provided
* @param channelOptions Channel options from the channel, plus resolver
* attributes
* @param resolutionNote A not from the resolver to include in errors
*/
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig,
channelOptions: ChannelOptions
): void;
channelOptions: ChannelOptions,
resolutionNote: string
): boolean;
/**
* If the load balancer is currently in the IDLE state, start connecting.
*/

View File

@ -23,7 +23,7 @@ import {
import { promises as dns } from 'dns';
import { extractAndSelectServiceConfig, ServiceConfig } from './service-config';
import { Status } from './constants';
import { StatusObject } from './call-interface';
import { StatusObject, StatusOr, statusOrFromError, statusOrFromValue } from './call-interface';
import { Metadata } from './metadata';
import * as logging from './logging';
import { LogVerbosity } from './constants';
@ -62,9 +62,8 @@ class DnsResolver implements Resolver {
private readonly minTimeBetweenResolutionsMs: number;
private pendingLookupPromise: Promise<TcpSubchannelAddress[]> | null = null;
private pendingTxtPromise: Promise<string[][]> | null = null;
private latestLookupResult: Endpoint[] | null = null;
private latestServiceConfig: ServiceConfig | null = null;
private latestServiceConfigError: StatusObject | null = null;
private latestLookupResult: StatusOr<Endpoint[]> | null = null;
private latestServiceConfigResult: StatusOr<ServiceConfig> | null = null;
private percentage: number;
private defaultResolutionError: StatusObject;
private backoff: BackoffTimeout;
@ -149,13 +148,12 @@ class DnsResolver implements Resolver {
if (!this.returnedIpResult) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.listener.onSuccessfulResolution(
this.ipResult!,
this.listener(
statusOrFromValue(this.ipResult!),
{},
null,
null,
null,
{}
);
''
)
});
this.returnedIpResult = true;
}
@ -167,11 +165,15 @@ class DnsResolver implements Resolver {
if (this.dnsHostname === null) {
trace('Failed to parse DNS address ' + uriToString(this.target));
setImmediate(() => {
this.listener.onError({
code: Status.UNAVAILABLE,
details: `Failed to parse DNS address ${uriToString(this.target)}`,
metadata: new Metadata(),
});
this.listener(
statusOrFromError({
code: Status.UNAVAILABLE,
details: `Failed to parse DNS address ${uriToString(this.target)}`
}),
{},
null,
''
);
});
this.stopNextResolutionTimer();
} else {
@ -194,11 +196,9 @@ class DnsResolver implements Resolver {
return;
}
this.pendingLookupPromise = null;
this.backoff.reset();
this.backoff.stop();
this.latestLookupResult = addressList.map(address => ({
this.latestLookupResult = statusOrFromValue(addressList.map(address => ({
addresses: [address],
}));
})));
const allAddressesString: string =
'[' +
addressList.map(addr => addr.host + ':' + addr.port).join(',') +
@ -209,21 +209,17 @@ class DnsResolver implements Resolver {
': ' +
allAddressesString
);
if (this.latestLookupResult.length === 0) {
this.listener.onError(this.defaultResolutionError);
return;
}
/* If the TXT lookup has not yet finished, both of the last two
* arguments will be null, which is the equivalent of getting an
* empty TXT response. When the TXT lookup does finish, its handler
* can update the service config by using the same address list */
this.listener.onSuccessfulResolution(
const healthStatus = this.listener(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
{},
this.latestServiceConfigResult,
''
);
this.handleHealthStatus(healthStatus);
},
err => {
if (this.pendingLookupPromise === null) {
@ -237,7 +233,12 @@ class DnsResolver implements Resolver {
);
this.pendingLookupPromise = null;
this.stopNextResolutionTimer();
this.listener.onError(this.defaultResolutionError);
this.listener(
statusOrFromError(this.defaultResolutionError),
{},
this.latestServiceConfigResult,
''
)
}
);
/* If there already is a still-pending TXT resolution, we can just use
@ -253,31 +254,35 @@ class DnsResolver implements Resolver {
return;
}
this.pendingTxtPromise = null;
let serviceConfig: ServiceConfig | null;
try {
this.latestServiceConfig = extractAndSelectServiceConfig(
serviceConfig = extractAndSelectServiceConfig(
txtRecord,
this.percentage
);
if (serviceConfig) {
this.latestServiceConfigResult = statusOrFromValue(serviceConfig);
} else {
this.latestServiceConfigResult = null;
}
} catch (err) {
this.latestServiceConfigError = {
this.latestServiceConfigResult = statusOrFromError({
code: Status.UNAVAILABLE,
details: `Parsing service config failed with error ${
(err as Error).message
}`,
metadata: new Metadata(),
};
}`
});
}
if (this.latestLookupResult !== null) {
/* We rely here on the assumption that calling this function with
* identical parameters will be essentialy idempotent, and calling
* it with the same address list and a different service config
* should result in a fast and seamless switchover. */
this.listener.onSuccessfulResolution(
this.listener(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
{},
this.latestServiceConfigResult,
''
);
}
},
@ -295,6 +300,21 @@ class DnsResolver implements Resolver {
}
}
/**
* The ResolverListener returns a boolean indicating whether the LB policy
* accepted the resolution result. A false result on an otherwise successful
* resolution should be treated as a resolution failure.
* @param healthStatus
*/
private handleHealthStatus(healthStatus: boolean) {
if (healthStatus) {
this.backoff.stop();
this.backoff.reset();
} else {
this.continueResolving = true;
}
}
private async lookup(hostname: string): Promise<TcpSubchannelAddress[]> {
if (GRPC_NODE_USE_ALTERNATIVE_RESOLVER) {
trace('Using alternative DNS resolver.');
@ -400,8 +420,7 @@ class DnsResolver implements Resolver {
this.pendingLookupPromise = null;
this.pendingTxtPromise = null;
this.latestLookupResult = null;
this.latestServiceConfig = null;
this.latestServiceConfigError = null;
this.latestServiceConfigResult = null;
this.returnedIpResult = false;
}

View File

@ -15,7 +15,7 @@
*/
import { isIPv4, isIPv6 } from 'net';
import { StatusObject } from './call-interface';
import { StatusObject, statusOrFromError, statusOrFromValue } from './call-interface';
import { ChannelOptions } from './channel-options';
import { LogVerbosity, Status } from './constants';
import { Metadata } from './metadata';
@ -92,14 +92,18 @@ class IpResolver implements Resolver {
this.hasReturnedResult = true;
process.nextTick(() => {
if (this.error) {
this.listener.onError(this.error);
this.listener(
statusOrFromError(this.error),
{},
null,
''
);
} else {
this.listener.onSuccessfulResolution(
this.endpoints,
this.listener(
statusOrFromValue(this.endpoints),
{},
null,
null,
null,
{}
''
);
}
});

View File

@ -18,6 +18,7 @@ import { Resolver, ResolverListener, registerResolver } from './resolver';
import { Endpoint } from './subchannel-address';
import { GrpcUri } from './uri-parser';
import { ChannelOptions } from './channel-options';
import { statusOrFromValue } from './call-interface';
class UdsResolver implements Resolver {
private hasReturnedResult = false;
@ -39,12 +40,11 @@ class UdsResolver implements Resolver {
if (!this.hasReturnedResult) {
this.hasReturnedResult = true;
process.nextTick(
this.listener.onSuccessfulResolution,
this.endpoints,
this.listener,
statusOrFromValue(this.endpoints),
{},
null,
null,
null,
{}
''
);
}
}

View File

@ -16,7 +16,7 @@
*/
import { MethodConfig, ServiceConfig } from './service-config';
import { StatusObject } from './call-interface';
import { StatusOr } from './call-interface';
import { Endpoint } from './subchannel-address';
import { GrpcUri, uriToString } from './uri-parser';
import { ChannelOptions } from './channel-options';
@ -24,6 +24,8 @@ import { Metadata } from './metadata';
import { Status } from './constants';
import { Filter, FilterFactory } from './filter';
export const CHANNEL_ARGS_CONFIG_SELECTOR_KEY = 'grpc.internal.config_selector';
export interface CallConfig {
methodConfig: MethodConfig;
onCommitted?: () => void;
@ -41,34 +43,27 @@ export interface ConfigSelector {
unref(): void;
}
/**
* A listener object passed to the resolver's constructor that provides name
* resolution updates back to the resolver's owner.
*/
export interface ResolverListener {
/**
* Called whenever the resolver has new name resolution results to report
* @param addressList The new list of backend addresses
* @param serviceConfig The new service configuration corresponding to the
* `addressList`. Will be `null` if no service configuration was
* retrieved or if the service configuration was invalid
* @param serviceConfigError If non-`null`, indicates that the retrieved
* service configuration was invalid
* Called whenever the resolver has new name resolution results or an error to
* report.
* @param endpointList The list of endpoints, or an error if resolution failed
* @param attributes Arbitrary key/value pairs to pass along to load balancing
* policies
* @param serviceConfig The service service config for the endpoint list, or an
* error if the retrieved service config is invalid, or null if there is no
* service config
* @param resolutionNote Provides additional context to RPC failure status
* messages generated by the load balancing policy.
* @returns Whether or not the load balancing policy accepted the result.
*/
onSuccessfulResolution(
addressList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
): void;
/**
* Called whenever a name resolution attempt fails.
* @param error Describes how resolution failed
*/
onError(error: StatusObject): void;
(
endpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown },
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
): boolean;
}
/**
* A resolver class that handles one or more of the name syntax schemes defined
* in the [gRPC Name Resolution document](https://github.com/grpc/grpc/blob/master/doc/naming.md)

View File

@ -27,12 +27,11 @@ import {
validateServiceConfig,
} from './service-config';
import { ConnectivityState } from './connectivity-state';
import { ConfigSelector, createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
import { CHANNEL_ARGS_CONFIG_SELECTOR_KEY, ConfigSelector, createResolver, Resolver } from './resolver';
import { Picker, UnavailablePicker, QueuePicker } from './picker';
import { BackoffOptions, BackoffTimeout } from './backoff-timeout';
import { Status } from './constants';
import { StatusObject } from './call-interface';
import { StatusObject, StatusOr } from './call-interface';
import { Metadata } from './metadata';
import * as logging from './logging';
import { LogVerbosity } from './constants';
@ -251,75 +250,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
);
this.innerResolver = createResolver(
target,
{
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
) => {
this.backoffTimeout.stop();
this.backoffTimeout.reset();
let workingServiceConfig: ServiceConfig | null = null;
/* This first group of conditionals implements the algorithm described
* in https://github.com/grpc/proposal/blob/master/A21-service-config-error-handling.md
* in the section called "Behavior on receiving a new gRPC Config".
*/
if (serviceConfig === null) {
// Step 4 and 5
if (serviceConfigError === null) {
// Step 5
this.previousServiceConfig = null;
workingServiceConfig = this.defaultServiceConfig;
} else {
// Step 4
if (this.previousServiceConfig === null) {
// Step 4.ii
this.handleResolutionFailure(serviceConfigError);
} else {
// Step 4.i
workingServiceConfig = this.previousServiceConfig;
}
}
} else {
// Step 3
workingServiceConfig = serviceConfig;
this.previousServiceConfig = serviceConfig;
}
const workingConfigList =
workingServiceConfig?.loadBalancingConfig ?? [];
const loadBalancingConfig = selectLbConfigFromList(
workingConfigList,
true
);
if (loadBalancingConfig === null) {
// There were load balancing configs but none are supported. This counts as a resolution failure
this.handleResolutionFailure({
code: Status.UNAVAILABLE,
details:
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
configSelector?.unref();
return;
}
this.childLoadBalancer.updateAddressList(
endpointList,
loadBalancingConfig,
{...this.channelOptions, ...attributes}
);
const finalServiceConfig =
workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(
finalServiceConfig,
configSelector ?? getDefaultConfigSelector(finalServiceConfig)
);
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);
},
},
this.handleResolverResult.bind(this),
channelOptions
);
const backoffOptions: BackoffOptions = {
@ -337,6 +268,62 @@ export class ResolvingLoadBalancer implements LoadBalancer {
this.backoffTimeout.unref();
}
private handleResolverResult(
endpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown },
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
): boolean {
this.backoffTimeout.stop();
this.backoffTimeout.reset();
let resultAccepted = true;
let workingServiceConfig: ServiceConfig | null = null;
if (serviceConfig === null) {
workingServiceConfig = this.defaultServiceConfig;
} else if (serviceConfig.ok) {
workingServiceConfig = serviceConfig.value;
} else {
if (this.previousServiceConfig !== null) {
workingServiceConfig = this.previousServiceConfig;
} else {
resultAccepted = false;
this.handleResolutionFailure(serviceConfig.error);
}
}
if (workingServiceConfig !== null) {
const workingConfigList =
workingServiceConfig?.loadBalancingConfig ?? [];
const loadBalancingConfig = selectLbConfigFromList(
workingConfigList,
true
);
if (loadBalancingConfig === null) {
resultAccepted = false;
this.handleResolutionFailure({
code: Status.UNAVAILABLE,
details:
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
} else {
resultAccepted = this.childLoadBalancer.updateAddressList(
endpointList,
loadBalancingConfig,
{...this.channelOptions, ...attributes},
resolutionNote
);
}
}
if (resultAccepted) {
this.onSuccessfulResolution(
workingServiceConfig!,
attributes[CHANNEL_ARGS_CONFIG_SELECTOR_KEY] as ConfigSelector ?? getDefaultConfigSelector(workingServiceConfig!)
);
}
return resultAccepted;
}
private updateResolution() {
this.innerResolver.updateResolution();
if (this.currentState === ConnectivityState.IDLE) {
@ -391,7 +378,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
}
updateAddressList(
endpointList: Endpoint[],
endpointList: StatusOr<Endpoint[]>,
lbConfig: TypedLoadBalancingConfig | null
): never {
throw new Error('updateAddressList not supported on ResolvingLoadBalancer');

View File

@ -301,6 +301,13 @@ const defaultResponder: FullResponder = {
},
};
export interface ConnectionInfo {
localAddress?: string | undefined;
localPort?: number | undefined;
remoteAddress?: string | undefined;
remotePort?: number | undefined;
}
export interface ServerInterceptingCallInterface {
/**
* Register the listener to handle inbound events.
@ -338,6 +345,10 @@ export interface ServerInterceptingCallInterface {
* Return the auth context of the connection the call is associated with.
*/
getAuthContext(): AuthContext;
/**
* Return information about the connection used to make the call.
*/
getConnectionInfo(): ConnectionInfo;
}
export class ServerInterceptingCall implements ServerInterceptingCallInterface {
@ -449,6 +460,9 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface {
getAuthContext(): AuthContext {
return this.nextCall.getAuthContext();
}
getConnectionInfo(): ConnectionInfo {
return this.nextCall.getConnectionInfo();
}
}
export interface ServerInterceptor {
@ -519,6 +533,7 @@ export class BaseServerInterceptingCall
private receivedHalfClose = false;
private streamEnded = false;
private host: string;
private connectionInfo: ConnectionInfo;
constructor(
private readonly stream: http2.ServerHttp2Stream,
@ -606,6 +621,14 @@ export class BaseServerInterceptingCall
metadata.remove(http2.constants.HTTP2_HEADER_TE);
metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
this.metadata = metadata;
const socket = stream.session?.socket;
this.connectionInfo = {
localAddress: socket?.localAddress,
localPort: socket?.localPort,
remoteAddress: socket?.remoteAddress,
remotePort: socket?.remotePort
};
}
private handleTimeoutHeader(timeoutHeader: string) {
@ -990,6 +1013,9 @@ export class BaseServerInterceptingCall
return {};
}
}
getConnectionInfo(): ConnectionInfo {
return this.connectionInfo;
}
}
export function getServerInterceptingCall(

View File

@ -782,27 +782,31 @@ export class Server {
private resolvePort(port: GrpcUri): Promise<SubchannelAddress[]> {
return new Promise<SubchannelAddress[]>((resolve, reject) => {
const resolverListener: ResolverListener = {
onSuccessfulResolution: (
endpointList,
serviceConfig,
serviceConfigError
) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {};
const addressList = ([] as SubchannelAddress[]).concat(
...endpointList.map(endpoint => endpoint.addresses)
);
if (addressList.length === 0) {
reject(new Error(`No addresses resolved for port ${port}`));
return;
}
resolve(addressList);
},
onError: error => {
reject(new Error(error.details));
},
};
let seenResolution = false;
const resolverListener: ResolverListener = (
endpointList,
attributes,
serviceConfig,
resolutionNote
) => {
if (seenResolution) {
return true;
}
seenResolution = true;
if (!endpointList.ok) {
reject(new Error(endpointList.error.details));
return true;
}
const addressList = ([] as SubchannelAddress[]).concat(
...endpointList.value.map(endpoint => endpoint.addresses)
);
if (addressList.length === 0) {
reject(new Error(`No addresses resolved for port ${port}`));
return true;
}
resolve(addressList);
return true;
}
const resolver = createResolver(port, resolverListener, this.options);
resolver.updateResolution();
});

View File

@ -150,15 +150,15 @@ export class TestClient {
this.client.waitForReady(deadline, callback);
}
sendRequest(callback: (error?: grpc.ServiceError) => void) {
this.client.echo({}, callback);
sendRequest(callback: (error?: grpc.ServiceError) => void): grpc.ClientUnaryCall {
return this.client.echo({}, callback);
}
sendRequestWithMetadata(
metadata: grpc.Metadata,
callback: (error?: grpc.ServiceError) => void
) {
this.client.echo({}, metadata, callback);
): grpc.ClientUnaryCall {
return this.client.echo({}, metadata, callback);
}
getChannelState() {

View File

@ -31,6 +31,8 @@ import { Metadata } from '../src/metadata';
import { Picker } from '../src/picker';
import { Endpoint, subchannelAddressToString } from '../src/subchannel-address';
import { MockSubchannel, TestClient, TestServer } from './common';
import { statusOrFromError, statusOrFromValue } from '../src/call-interface';
import { Status } from '../src/constants';
function updateStateCallBackForExpectedStateSequence(
expectedStateSequence: ConnectivityState[],
@ -125,9 +127,10 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.READY);
@ -145,12 +148,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.READY);
@ -168,16 +172,17 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{
addresses: [
{ host: 'localhost', port: 1 },
{ host: 'localhost', port: 2 },
],
},
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.READY);
@ -203,9 +208,10 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
});
it('Should stay CONNECTING if only some subchannels fail to connect', done => {
@ -220,12 +226,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
@ -243,12 +250,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
@ -269,12 +277,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
@ -309,12 +318,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
});
it('Should enter READY if a subchannel connects after entering TRANSIENT_FAILURE mode', done => {
@ -337,12 +347,13 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.READY);
@ -369,22 +380,24 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
currentStartState = ConnectivityState.CONNECTING;
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
});
});
@ -409,19 +422,21 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[
statusOrFromValue([
{ addresses: [{ host: 'localhost', port: 1 }] },
{ addresses: [{ host: 'localhost', port: 2 }] },
],
]),
config,
{}
{},
''
);
process.nextTick(() => {
currentStartState = ConnectivityState.READY;
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 3 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 3 }] }]),
config,
{}
{},
''
);
});
});
@ -446,9 +461,10 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -475,16 +491,18 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
currentStartState = ConnectivityState.IDLE;
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -512,16 +530,18 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
currentStartState = ConnectivityState.TRANSIENT_FAILURE;
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -549,15 +569,17 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -597,25 +619,28 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 3 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 3 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[2].transitionToState(
@ -660,21 +685,24 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 2 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 2 }] }]),
config,
{}
{},
''
);
});
});
@ -704,9 +732,10 @@ describe('pick_first load balancing policy', () => {
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(
[{ addresses: [{ host: 'localhost', port: 1 }] }],
statusOrFromValue([{ addresses: [{ host: 'localhost', port: 1 }] }]),
config,
{}
{},
''
);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.IDLE);
@ -726,7 +755,20 @@ describe('pick_first load balancing policy', () => {
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList([], config, {});
pickFirst.updateAddressList(statusOrFromValue([]), config, {}, '');
});
it('Should report TRANSIENT_FAILURE with an endpoint list error', done => {
const channelControlHelper = createChildChannelControlHelper(
baseChannelControlHelper,
{
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.TRANSIENT_FAILURE],
done
),
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(statusOrFromError({code: Status.UNAVAILABLE, details: 'Resolver error'}), config, {}, '');
});
describe('Address list randomization', () => {
const shuffleConfig = new PickFirstLoadBalancingConfig(true);
@ -760,20 +802,21 @@ describe('pick_first load balancing policy', () => {
for (let i = 0; i < 10; i++) {
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
}
const endpointList = statusOrFromValue(endpoints);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
/* Pick from 10 subchannels 5 times, with address randomization enabled,
* and verify that at least two different subchannels are picked. The
* probability choosing the same address every time is 1/10,000, which
* I am considering an acceptable flake rate */
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, shuffleConfig, {});
pickFirst.updateAddressList(endpointList, shuffleConfig, {}, '');
process.nextTick(() => {
assert(pickedSubchannels.size > 1);
done();
@ -816,16 +859,17 @@ describe('pick_first load balancing policy', () => {
for (let i = 0; i < 10; i++) {
endpoints.push({ addresses: [{ host: 'localhost', port: i + 1 }] });
}
const endpointList = statusOrFromValue(endpoints);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
pickFirst.updateAddressList(endpoints, config, {});
pickFirst.updateAddressList(endpointList, config, {}, '');
process.nextTick(() => {
assert(pickedSubchannels.size === 1);
done();

View File

@ -23,7 +23,7 @@ import * as resolver_dns from '../src/resolver-dns';
import * as resolver_uds from '../src/resolver-uds';
import * as resolver_ip from '../src/resolver-ip';
import { ServiceConfig } from '../src/service-config';
import { StatusObject } from '../src/call-interface';
import { StatusOr } from '../src/call-interface';
import {
Endpoint,
SubchannelAddress,
@ -63,25 +63,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('localhost:50051')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -93,65 +95,71 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('localhost')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it('Should correctly represent an ipv4 address', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('1.2.3.4')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '1.2.3.4', port: 443 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '1.2.3.4', port: 443 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it('Should correctly represent an ipv6 address', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('::1')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -160,22 +168,24 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('[::1]:50051')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -184,20 +194,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('example.com')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(endpointList.length > 0);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(endpointList.length > 0);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -208,23 +220,21 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('grpctest.kleinsch.com')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
if (serviceConfig !== null) {
assert(
serviceConfig.loadBalancingPolicy === 'round_robin',
'Should have found round robin LB policy'
);
done();
}
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (serviceConfig !== null) {
assert(serviceConfig.ok);
assert(
serviceConfig.value.loadBalancingPolicy === 'round_robin',
'Should have found round robin LB policy'
);
done();
}
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -234,21 +244,18 @@ describe('Name Resolver', () => {
parseUri('grpctest.kleinsch.com')!
)!;
let count = 0;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert(
serviceConfig === null,
'Should not have found service config'
);
count++;
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
assert(
serviceConfig === null,
'Should not have found service config'
);
count++;
return true;
};
const resolver = resolverManager.createResolver(target, listener, {
'grpc.service_config_disable_resolution': 1,
@ -271,25 +278,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('loopback4.unittest.grpc.io')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
`None of [${endpointList.map(addr =>
endpointToString(addr)
)}] matched '127.0.0.1:443'`
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
`None of [${endpointList.map(addr =>
endpointToString(addr)
)}] matched '127.0.0.1:443'`
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -300,20 +309,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('loopback6.unittest.grpc.io')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -325,27 +336,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('loopback46.unittest.grpc.io')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
`None of [${endpointList.map(addr =>
endpointToString(addr)
)}] matched '127.0.0.1:443'`
);
/* TODO(murgatroid99): check for IPv6 result, once we can get that
* consistently */
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 }),
`None of [${endpointList.map(addr =>
endpointToString(addr)
)}] matched '127.0.0.1:443'`
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -356,20 +367,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('network-tools.com')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(endpointList.length > 0);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(endpointList.length > 0);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -386,23 +399,23 @@ describe('Name Resolver', () => {
const target2 = resolverManager.mapUriDefaultScheme(
parseUri('grpc-test4.sandbox.googleapis.com')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert(endpointList.length > 0);
completeCount += 1;
if (completeCount === 2) {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
done();
}
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (completeCount >= 2) {
return true;
}
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(endpointList.length > 0);
completeCount += 1;
if (completeCount === 2) {
done();
}
return true;
};
const resolver1 = resolverManager.createResolver(target1, listener, {});
resolver1.updateResolution();
@ -419,26 +432,25 @@ describe('Name Resolver', () => {
let resultCount = 0;
const resolver = resolverManager.createResolver(
target,
{
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 443 })
);
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
},
onError: (error: StatusObject) => {
assert.ifError(error);
},
(
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 443 })
);
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
return true;
},
{ 'grpc.dns_min_time_between_resolutions_ms': 2000 }
);
@ -455,20 +467,18 @@ describe('Name Resolver', () => {
let resultCount = 0;
const resolver = resolverManager.createResolver(
target,
{
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert.fail('Resolution succeeded unexpectedly');
},
onError: (error: StatusObject) => {
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
},
(
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
assert(!maybeEndpointList.ok);
resultCount += 1;
if (resultCount === 1) {
process.nextTick(() => resolver.updateResolution());
}
return true;
},
{}
);
@ -484,20 +494,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('unix:socket')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { path: 'socket' }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { path: 'socket' }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -506,20 +518,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('unix:///tmp/socket')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { path: '/tmp/socket' }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { path: '/tmp/socket' }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -530,22 +544,24 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv4:127.0.0.1')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 443 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -554,22 +570,24 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv4:127.0.0.1:50051')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -578,25 +596,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv4:127.0.0.1:50051,127.0.0.1:50052')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50052 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '127.0.0.1', port: 50052 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -605,20 +625,22 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv6:::1')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(hasMatchingAddress(endpointList, { host: '::1', port: 443 }));
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -627,22 +649,24 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv6:[::1]:50051')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
@ -651,25 +675,27 @@ describe('Name Resolver', () => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('ipv6:[::1]:50051,[::1]:50052')!
)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50052 })
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
let resultSeen = false;
const listener: resolverManager.ResolverListener = (
maybeEndpointList: StatusOr<Endpoint[]>,
attributes: { [key: string]: unknown},
serviceConfig: StatusOr<ServiceConfig> | null,
resolutionNote: string
) => {
if (resultSeen) {
return true;
}
resultSeen = true;
assert(maybeEndpointList.ok);
const endpointList = maybeEndpointList.value;
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50051 })
);
assert(
hasMatchingAddress(endpointList, { host: '::1', port: 50052 })
);
done();
return true;
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();

View File

@ -127,6 +127,22 @@ const testHeaderInjectionInterceptor: grpc.ServerInterceptor = (
});
};
const callExaminationInterceptor: grpc.ServerInterceptor = (
methodDescriptor,
call
) => {
const connectionInfo = call.getConnectionInfo();
return new grpc.ServerInterceptingCall(call, {
sendMetadata: (metadata, next) => {
metadata.add('local-address', `${connectionInfo.localAddress}`);
metadata.add('local-port', `${connectionInfo.localPort}`);
metadata.add('remote-address', `${connectionInfo.remoteAddress}`);
metadata.add('remote-port', `${connectionInfo.remotePort}`);
next(metadata);
}
})
}
describe('Server interceptors', () => {
describe('Auth-type interceptor', () => {
let server: grpc.Server;
@ -336,4 +352,45 @@ describe('Server interceptors', () => {
});
});
});
describe('Call properties', () => {
let server: grpc.Server;
let client: TestClient;
let portNum: number;
/* Tests that an interceptor can entirely prevent the handler from being
* invoked, based on the contents of the metadata. */
before(done => {
server = new grpc.Server({ interceptors: [callExaminationInterceptor] });
server.addService(echoService.service, {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
callback(null, call.request);
},
});
server.bindAsync(
'[::1]:0',
grpc.ServerCredentials.createInsecure(),
(error, port) => {
assert.ifError(error);
client = new TestClient(`localhost:${port}`, false);
portNum = port;
done();
}
);
});
after(done => {
client.close();
server.tryShutdown(done);
});
it('Should get valid connection information', done => {
const call = client.sendRequest(done);
call.on('metadata', metadata => {
assert.strictEqual(metadata.get('local-address')[0], '::1');
assert.strictEqual(metadata.get('remote-address')[0], '::1');
assert.strictEqual(metadata.get('local-port')[0], `${portNum}`);
assert.notStrictEqual(metadata.get('remote-port')[0], 'undefined');
});
});
});
});

View File

@ -61,44 +61,53 @@ The `proto-loader-gen-types` script distributed with this package can be used to
proto-loader-gen-types.js [options] filenames...
Options:
--help Show help [boolean]
--version Show version number [boolean]
--keepCase Preserve the case of field names
--help Show help [boolean]
--version Show version number [boolean]
--keepCase Preserve the case of field names
[boolean] [default: false]
--longs The type that should be used to output 64 bit integer
values. Can be String, Number[string] [default: "Long"]
--enums The type that should be used to output enum fields. Can
be String [string] [default: "number"]
--bytes The type that should be used to output bytes fields.
Can be String, Array [string] [default: "Buffer"]
--defaults Output default values for omitted fields
--longs The type that should be used to output 64 bit
integer values. Can be String, Number
[string] [default: "Long"]
--enums The type that should be used to output enum fields.
Can be String [string] [default: "number"]
--bytes The type that should be used to output bytes
fields. Can be String, Array
[string] [default: "Buffer"]
--defaults Output default values for omitted fields
[boolean] [default: false]
--arrays Output default values for omitted repeated fields even
if --defaults is not set [boolean] [default: false]
--objects Output default values for omitted message fields even
if --defaults is not set [boolean] [default: false]
--oneofs Output virtual oneof fields set to the present field's
name [boolean] [default: false]
--json Represent Infinity and NaN as strings in float fields.
Also decode google.protobuf.Any automatically
--arrays Output default values for omitted repeated fields
even if --defaults is not set
[boolean] [default: false]
--includeComments Generate doc comments from comments in the original
files [boolean] [default: false]
-I, --includeDirs Directories to search for included files [array]
-O, --outDir Directory in which to output files [string] [required]
--grpcLib The gRPC implementation library that these types will
be used with. If not provided, some types will not be
generated [string]
--inputTemplate Template for mapping input or "permissive" type names
[string] [default: "%s"]
--outputTemplate Template for mapping output or "restricted" type names
[string] [default: "%s__Output"]
--inputBranded Output property for branded type for "permissive"
types with fullName of the Message as its value
--objects Output default values for omitted message fields
even if --defaults is not set
[boolean] [default: false]
--outputBranded Output property for branded type for "restricted"
types with fullName of the Message as its value
--oneofs Output virtual oneof fields set to the present
field's name [boolean] [default: false]
--json Represent Infinity and NaN as strings in float
fields. Also decode google.protobuf.Any
automatically [boolean] [default: false]
--includeComments Generate doc comments from comments in the original
files [boolean] [default: false]
-I, --includeDirs Directories to search for included files [array]
-O, --outDir Directory in which to output files
[string] [required]
--grpcLib The gRPC implementation library that these types
will be used with. If not provided, some types will
not be generated [string]
--inputTemplate Template for mapping input or "permissive" type
names [string] [default: "%s"]
--outputTemplate Template for mapping output or "restricted" type
names [string] [default: "%s__Output"]
--inputBranded Output property for branded type for "permissive"
types with fullName of the Message as its value
[boolean] [default: false]
--outputBranded Output property for branded type for "restricted"
types with fullName of the Message as its value
[boolean] [default: false]
--targetFileExtension File extension for generated files.
[string] [default: ".ts"]
--importFileExtension File extension for import specifiers in generated
code. [string] [default: ""]
```
### Example Usage

View File

@ -47,6 +47,8 @@ type GeneratorOptions = Protobuf.IParseOptions & Protobuf.IConversionOptions & {
outputTemplate: string;
inputBranded: boolean;
outputBranded: boolean;
targetFileExtension?: string;
importFileExtension?: string;
}
class TextFormatter {
@ -105,8 +107,8 @@ function getImportPath(to: Protobuf.Type | Protobuf.Enum | Protobuf.Service): st
return stripLeadingPeriod(to.fullName).replace(/\./g, '/');
}
function getPath(to: Protobuf.Type | Protobuf.Enum | Protobuf.Service) {
return stripLeadingPeriod(to.fullName).replace(/\./g, '/') + '.ts';
function getPath(to: Protobuf.Type | Protobuf.Enum | Protobuf.Service, options: GeneratorOptions) {
return stripLeadingPeriod(to.fullName).replace(/\./g, '/') + options.targetFileExtension;
}
function getPathToRoot(from: Protobuf.NamespaceBase) {
@ -153,7 +155,7 @@ function getImportLine(dependency: Protobuf.Type | Protobuf.Enum | Protobuf.Serv
throw new Error('Invalid object passed to getImportLine');
}
}
return `import type { ${importedTypes} } from '${filePath}';`
return `import type { ${importedTypes} } from '${filePath}${options.importFileExtension}';`
}
function getChildMessagesAndEnums(namespace: Protobuf.NamespaceBase): (Protobuf.Type | Protobuf.Enum)[] {
@ -787,21 +789,21 @@ function generateFilesForNamespace(namespace: Protobuf.NamespaceBase, options: G
if (nested instanceof Protobuf.Type) {
generateMessageInterfaces(fileFormatter, nested, options);
if (options.verbose) {
console.log(`Writing ${options.outDir}/${getPath(nested)} from file ${nested.filename}`);
console.log(`Writing ${options.outDir}/${getPath(nested, options)} from file ${nested.filename}`);
}
filePromises.push(writeFile(`${options.outDir}/${getPath(nested)}`, fileFormatter.getFullText()));
filePromises.push(writeFile(`${options.outDir}/${getPath(nested, options)}`, fileFormatter.getFullText()));
} else if (nested instanceof Protobuf.Enum) {
generateEnumInterface(fileFormatter, nested, options);
if (options.verbose) {
console.log(`Writing ${options.outDir}/${getPath(nested)} from file ${nested.filename}`);
console.log(`Writing ${options.outDir}/${getPath(nested, options)} from file ${nested.filename}`);
}
filePromises.push(writeFile(`${options.outDir}/${getPath(nested)}`, fileFormatter.getFullText()));
filePromises.push(writeFile(`${options.outDir}/${getPath(nested, options)}`, fileFormatter.getFullText()));
} else if (nested instanceof Protobuf.Service) {
generateServiceInterfaces(fileFormatter, nested, options);
if (options.verbose) {
console.log(`Writing ${options.outDir}/${getPath(nested)} from file ${nested.filename}`);
console.log(`Writing ${options.outDir}/${getPath(nested, options)} from file ${nested.filename}`);
}
filePromises.push(writeFile(`${options.outDir}/${getPath(nested)}`, fileFormatter.getFullText()));
filePromises.push(writeFile(`${options.outDir}/${getPath(nested, options)}`, fileFormatter.getFullText()));
} else if (isNamespaceBase(nested)) {
filePromises.push(...generateFilesForNamespace(nested, options));
}
@ -877,6 +879,8 @@ async function runScript() {
.option('outputTemplate', { string: true, default: `${templateStr}__Output` })
.option('inputBranded', boolDefaultFalseOption)
.option('outputBranded', boolDefaultFalseOption)
.option('targetFileExtension', { string: true, default: '.ts' })
.option('importFileExtension', { string: true, default: '' })
.coerce('longs', value => {
switch (value) {
case 'String': return String;
@ -916,6 +920,8 @@ async function runScript() {
outputTemplate: 'Template for mapping output or "restricted" type names',
inputBranded: 'Output property for branded type for "permissive" types with fullName of the Message as its value',
outputBranded: 'Output property for branded type for "restricted" types with fullName of the Message as its value',
targetFileExtension: 'File extension for generated files.',
importFileExtension: 'File extension for import specifiers in generated code.'
}).demandOption(['outDir'])
.demand(1)
.usage('$0 [options] filenames...')