mirror of https://github.com/grpc/grpc-node.git
grpc-js: Use an object to represent subchannel addresses
This commit is contained in:
parent
6994f1b1df
commit
57c18382d8
|
|
@ -38,6 +38,7 @@ import { getDefaultAuthority } from './resolver';
|
||||||
import { LoadBalancingConfig } from './load-balancing-config';
|
import { LoadBalancingConfig } from './load-balancing-config';
|
||||||
import { ServiceConfig, validateServiceConfig } from './service-config';
|
import { ServiceConfig, validateServiceConfig } from './service-config';
|
||||||
import { trace } from './logging';
|
import { trace } from './logging';
|
||||||
|
import { SubchannelAddress } from './subchannel';
|
||||||
|
|
||||||
export enum ConnectivityState {
|
export enum ConnectivityState {
|
||||||
CONNECTING,
|
CONNECTING,
|
||||||
|
|
@ -145,7 +146,7 @@ export class ChannelImplementation implements Channel {
|
||||||
this.subchannelPool = getSubchannelPool((options['grpc.use_local_subchannel_pool'] ?? 0) === 0);
|
this.subchannelPool = getSubchannelPool((options['grpc.use_local_subchannel_pool'] ?? 0) === 0);
|
||||||
const channelControlHelper: ChannelControlHelper = {
|
const channelControlHelper: ChannelControlHelper = {
|
||||||
createSubchannel: (
|
createSubchannel: (
|
||||||
subchannelAddress: string,
|
subchannelAddress: SubchannelAddress,
|
||||||
subchannelArgs: ChannelOptions
|
subchannelArgs: ChannelOptions
|
||||||
) => {
|
) => {
|
||||||
return this.subchannelPool.getOrCreateSubchannel(
|
return this.subchannelPool.getOrCreateSubchannel(
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ import {
|
||||||
UnavailablePicker,
|
UnavailablePicker,
|
||||||
} from './picker';
|
} from './picker';
|
||||||
import { LoadBalancingConfig } from './load-balancing-config';
|
import { LoadBalancingConfig } from './load-balancing-config';
|
||||||
import { Subchannel, ConnectivityStateListener } from './subchannel';
|
import { Subchannel, ConnectivityStateListener, SubchannelAddress } from './subchannel';
|
||||||
import * as logging from './logging';
|
import * as logging from './logging';
|
||||||
import { LogVerbosity } from './constants';
|
import { LogVerbosity } from './constants';
|
||||||
|
|
||||||
|
|
@ -76,7 +76,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||||
/**
|
/**
|
||||||
* The list of backend addresses most recently passed to `updateAddressList`.
|
* The list of backend addresses most recently passed to `updateAddressList`.
|
||||||
*/
|
*/
|
||||||
private latestAddressList: string[] = [];
|
private latestAddressList: SubchannelAddress[] = [];
|
||||||
/**
|
/**
|
||||||
* The list of subchannels this load balancer is currently attempting to
|
* The list of subchannels this load balancer is currently attempting to
|
||||||
* connect to.
|
* connect to.
|
||||||
|
|
@ -369,7 +369,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
updateAddressList(
|
updateAddressList(
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
lbConfig: LoadBalancingConfig | null
|
lbConfig: LoadBalancingConfig | null
|
||||||
): void {
|
): void {
|
||||||
// lbConfig has no useful information for pick first load balancing
|
// lbConfig has no useful information for pick first load balancing
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ import {
|
||||||
UnavailablePicker,
|
UnavailablePicker,
|
||||||
} from './picker';
|
} from './picker';
|
||||||
import { LoadBalancingConfig } from './load-balancing-config';
|
import { LoadBalancingConfig } from './load-balancing-config';
|
||||||
import { Subchannel, ConnectivityStateListener } from './subchannel';
|
import { Subchannel, ConnectivityStateListener, SubchannelAddress } from './subchannel';
|
||||||
|
|
||||||
const TYPE_NAME = 'round_robin';
|
const TYPE_NAME = 'round_robin';
|
||||||
|
|
||||||
|
|
@ -168,7 +168,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
updateAddressList(
|
updateAddressList(
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
lbConfig: LoadBalancingConfig | null
|
lbConfig: LoadBalancingConfig | null
|
||||||
): void {
|
): void {
|
||||||
this.resetSubchannelList();
|
this.resetSubchannelList();
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { ChannelOptions } from './channel-options';
|
import { ChannelOptions } from './channel-options';
|
||||||
import { Subchannel } from './subchannel';
|
import { Subchannel, SubchannelAddress } from './subchannel';
|
||||||
import { ConnectivityState } from './channel';
|
import { ConnectivityState } from './channel';
|
||||||
import { Picker } from './picker';
|
import { Picker } from './picker';
|
||||||
import { LoadBalancingConfig } from './load-balancing-config';
|
import { LoadBalancingConfig } from './load-balancing-config';
|
||||||
|
|
@ -34,7 +34,7 @@ export interface ChannelControlHelper {
|
||||||
* @param subchannelArgs Extra channel arguments specified by the load balancer
|
* @param subchannelArgs Extra channel arguments specified by the load balancer
|
||||||
*/
|
*/
|
||||||
createSubchannel(
|
createSubchannel(
|
||||||
subchannelAddress: string,
|
subchannelAddress: SubchannelAddress,
|
||||||
subchannelArgs: ChannelOptions
|
subchannelArgs: ChannelOptions
|
||||||
): Subchannel;
|
): Subchannel;
|
||||||
/**
|
/**
|
||||||
|
|
@ -66,7 +66,7 @@ export interface LoadBalancer {
|
||||||
* if one was provided
|
* if one was provided
|
||||||
*/
|
*/
|
||||||
updateAddressList(
|
updateAddressList(
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
lbConfig: LoadBalancingConfig | null
|
lbConfig: LoadBalancingConfig | null
|
||||||
): void;
|
): void;
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ import { StatusObject } from './call-stream';
|
||||||
import { Metadata } from './metadata';
|
import { Metadata } from './metadata';
|
||||||
import * as logging from './logging';
|
import * as logging from './logging';
|
||||||
import { LogVerbosity } from './constants';
|
import { LogVerbosity } from './constants';
|
||||||
|
import { SubchannelAddress } from './subchannel';
|
||||||
|
|
||||||
const TRACER_NAME = 'dns_resolver';
|
const TRACER_NAME = 'dns_resolver';
|
||||||
|
|
||||||
|
|
@ -112,7 +113,7 @@ const dnsLookupPromise = util.promisify(dns.lookup);
|
||||||
* @param target
|
* @param target
|
||||||
* @return An "IP:port" string in an array if parsing was successful, `null` otherwise
|
* @return An "IP:port" string in an array if parsing was successful, `null` otherwise
|
||||||
*/
|
*/
|
||||||
function parseIP(target: string): string[] | null {
|
function parseIP(target: string): SubchannelAddress[] | null {
|
||||||
/* These three regular expressions are all mutually exclusive, so we just
|
/* These three regular expressions are all mutually exclusive, so we just
|
||||||
* want the first one that matches the target string, if any do. */
|
* want the first one that matches the target string, if any do. */
|
||||||
const ipv4Match = IPV4_REGEX.exec(target);
|
const ipv4Match = IPV4_REGEX.exec(target);
|
||||||
|
|
@ -123,14 +124,14 @@ function parseIP(target: string): string[] | null {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ipv6 addresses should be bracketed
|
// ipv6 addresses should be bracketed
|
||||||
const addr = ipv4Match ? match[1] : `[${match[1]}]`;
|
const addr = match[1];
|
||||||
let port: string;
|
let port: string;
|
||||||
if (match[2]) {
|
if (match[2]) {
|
||||||
port = match[2];
|
port = match[2];
|
||||||
} else {
|
} else {
|
||||||
port = DEFAULT_PORT;
|
port = DEFAULT_PORT;
|
||||||
}
|
}
|
||||||
return [`${addr}:${port}`];
|
return [{host: addr, port: +port}];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -161,7 +162,7 @@ function mergeArrays<T>(...arrays: T[][]): T[] {
|
||||||
* Resolver implementation that handles DNS names and IP addresses.
|
* Resolver implementation that handles DNS names and IP addresses.
|
||||||
*/
|
*/
|
||||||
class DnsResolver implements Resolver {
|
class DnsResolver implements Resolver {
|
||||||
private readonly ipResult: string[] | null;
|
private readonly ipResult: SubchannelAddress[] | null;
|
||||||
private readonly dnsHostname: string | null;
|
private readonly dnsHostname: string | null;
|
||||||
private readonly port: string | null;
|
private readonly port: string | null;
|
||||||
/* The promise results here contain, in order, the A record, the AAAA record,
|
/* The promise results here contain, in order, the A record, the AAAA record,
|
||||||
|
|
@ -222,23 +223,21 @@ class DnsResolver implements Resolver {
|
||||||
this.pendingResultPromise.then(
|
this.pendingResultPromise.then(
|
||||||
([addressList, txtRecord]) => {
|
([addressList, txtRecord]) => {
|
||||||
this.pendingResultPromise = null;
|
this.pendingResultPromise = null;
|
||||||
const ip4Addresses: string[] = addressList
|
const ip4Addresses: dns.LookupAddress[] = addressList
|
||||||
.filter(addr => addr.family === 4)
|
.filter(addr => addr.family === 4);
|
||||||
.map(addr => `${addr.address}:${this.port}`);
|
let ip6Addresses: dns.LookupAddress[];
|
||||||
let ip6Addresses: string[];
|
|
||||||
if (semver.satisfies(process.version, IPV6_SUPPORT_RANGE)) {
|
if (semver.satisfies(process.version, IPV6_SUPPORT_RANGE)) {
|
||||||
ip6Addresses = addressList
|
ip6Addresses = addressList.filter(addr => addr.family === 6);
|
||||||
.filter(addr => addr.family === 6)
|
|
||||||
.map(addr => `[${addr.address}]:${this.port}`);
|
|
||||||
} else {
|
} else {
|
||||||
ip6Addresses = [];
|
ip6Addresses = [];
|
||||||
}
|
}
|
||||||
const allAddresses: string[] = mergeArrays(
|
const allAddresses: SubchannelAddress[] = mergeArrays(
|
||||||
ip4Addresses,
|
ip4Addresses,
|
||||||
ip6Addresses
|
ip6Addresses
|
||||||
);
|
).map(addr => {return {host: addr.address, port: +this.port!};});
|
||||||
|
const allAddressesString: string = '[' + allAddresses.map(addr => addr.host + ':' + addr.port).join(',') + ']';
|
||||||
trace(
|
trace(
|
||||||
'Resolved addresses for target ' + this.target + ': ' + allAddresses
|
'Resolved addresses for target ' + this.target + ': ' + allAddressesString
|
||||||
);
|
);
|
||||||
if (allAddresses.length === 0) {
|
if (allAddresses.length === 0) {
|
||||||
this.listener.onError(this.defaultResolutionError);
|
this.listener.onError(this.defaultResolutionError);
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import {
|
||||||
registerResolver,
|
registerResolver,
|
||||||
registerDefaultResolver,
|
registerDefaultResolver,
|
||||||
} from './resolver';
|
} from './resolver';
|
||||||
|
import { SubchannelAddress } from './subchannel';
|
||||||
|
|
||||||
function getUdsName(target: string): string {
|
function getUdsName(target: string): string {
|
||||||
/* Due to how this resolver is registered, it should only be constructed
|
/* Due to how this resolver is registered, it should only be constructed
|
||||||
|
|
@ -34,9 +35,9 @@ function getUdsName(target: string): string {
|
||||||
}
|
}
|
||||||
|
|
||||||
class UdsResolver implements Resolver {
|
class UdsResolver implements Resolver {
|
||||||
private addresses: string[] = [];
|
private addresses: SubchannelAddress[] = [];
|
||||||
constructor(target: string, private listener: ResolverListener) {
|
constructor(target: string, private listener: ResolverListener) {
|
||||||
this.addresses = [getUdsName(target)];
|
this.addresses = [{path: getUdsName(target)}];
|
||||||
}
|
}
|
||||||
updateResolution(): void {
|
updateResolution(): void {
|
||||||
process.nextTick(
|
process.nextTick(
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import { ServiceConfig } from './service-config';
|
||||||
import * as resolver_dns from './resolver-dns';
|
import * as resolver_dns from './resolver-dns';
|
||||||
import * as resolver_uds from './resolver-uds';
|
import * as resolver_uds from './resolver-uds';
|
||||||
import { StatusObject } from './call-stream';
|
import { StatusObject } from './call-stream';
|
||||||
|
import { SubchannelAddress } from './subchannel';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A listener object passed to the resolver's constructor that provides name
|
* A listener object passed to the resolver's constructor that provides name
|
||||||
|
|
@ -36,7 +37,7 @@ export interface ResolverListener {
|
||||||
* service configuration was invalid
|
* service configuration was invalid
|
||||||
*/
|
*/
|
||||||
onSuccessfulResolution(
|
onSuccessfulResolution(
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
): void;
|
): void;
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@ import { StatusObject } from './call-stream';
|
||||||
import { Metadata } from './metadata';
|
import { Metadata } from './metadata';
|
||||||
import * as logging from './logging';
|
import * as logging from './logging';
|
||||||
import { LogVerbosity } from './constants';
|
import { LogVerbosity } from './constants';
|
||||||
|
import { SubchannelAddress } from './subchannel';
|
||||||
|
|
||||||
const TRACER_NAME = 'resolving_load_balancer';
|
const TRACER_NAME = 'resolving_load_balancer';
|
||||||
|
|
||||||
|
|
@ -132,7 +133,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||||
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
||||||
this.innerResolver = createResolver(target, {
|
this.innerResolver = createResolver(target, {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: ServiceError | null
|
serviceConfigError: ServiceError | null
|
||||||
) => {
|
) => {
|
||||||
|
|
@ -243,7 +244,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
this.innerChannelControlHelper = {
|
this.innerChannelControlHelper = {
|
||||||
createSubchannel: (
|
createSubchannel: (
|
||||||
subchannelAddress: string,
|
subchannelAddress: SubchannelAddress,
|
||||||
subchannelArgs: ChannelOptions
|
subchannelArgs: ChannelOptions
|
||||||
) => {
|
) => {
|
||||||
return this.channelControlHelper.createSubchannel(
|
return this.channelControlHelper.createSubchannel(
|
||||||
|
|
@ -289,7 +290,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
this.replacementChannelControlHelper = {
|
this.replacementChannelControlHelper = {
|
||||||
createSubchannel: (
|
createSubchannel: (
|
||||||
subchannelAddress: string,
|
subchannelAddress: SubchannelAddress,
|
||||||
subchannelArgs: ChannelOptions
|
subchannelArgs: ChannelOptions
|
||||||
) => {
|
) => {
|
||||||
return this.channelControlHelper.createSubchannel(
|
return this.channelControlHelper.createSubchannel(
|
||||||
|
|
@ -409,7 +410,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
updateAddressList(
|
updateAddressList(
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
lbConfig: LoadBalancingConfig | null
|
lbConfig: LoadBalancingConfig | null
|
||||||
) {
|
) {
|
||||||
throw new Error('updateAddressList not supported on ResolvingLoadBalancer');
|
throw new Error('updateAddressList not supported on ResolvingLoadBalancer');
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { ChannelOptions, channelOptionsEqual } from './channel-options';
|
import { ChannelOptions, channelOptionsEqual } from './channel-options';
|
||||||
import { Subchannel } from './subchannel';
|
import { Subchannel, SubchannelAddress, subchannelAddressEqual } from './subchannel';
|
||||||
import { ChannelCredentials } from './channel-credentials';
|
import { ChannelCredentials } from './channel-credentials';
|
||||||
|
|
||||||
// 10 seconds in milliseconds. This value is arbitrary.
|
// 10 seconds in milliseconds. This value is arbitrary.
|
||||||
|
|
@ -28,13 +28,12 @@ const REF_CHECK_INTERVAL = 10_000;
|
||||||
|
|
||||||
export class SubchannelPool {
|
export class SubchannelPool {
|
||||||
private pool: {
|
private pool: {
|
||||||
[channelTarget: string]: {
|
[channelTarget: string]: Array<{
|
||||||
[subchannelTarget: string]: Array<{
|
subchannelAddress: SubchannelAddress;
|
||||||
channelArguments: ChannelOptions;
|
channelArguments: ChannelOptions;
|
||||||
channelCredentials: ChannelCredentials;
|
channelCredentials: ChannelCredentials;
|
||||||
subchannel: Subchannel;
|
subchannel: Subchannel;
|
||||||
}>;
|
}>;
|
||||||
};
|
|
||||||
} = Object.create(null);
|
} = Object.create(null);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -62,9 +61,7 @@ export class SubchannelPool {
|
||||||
* do not need to be filtered */
|
* do not need to be filtered */
|
||||||
// tslint:disable-next-line:forin
|
// tslint:disable-next-line:forin
|
||||||
for (const channelTarget in this.pool) {
|
for (const channelTarget in this.pool) {
|
||||||
// tslint:disable-next-line:forin
|
const subchannelObjArray = this.pool[channelTarget];
|
||||||
for (const subchannelTarget in this.pool[channelTarget]) {
|
|
||||||
const subchannelObjArray = this.pool[channelTarget][subchannelTarget];
|
|
||||||
|
|
||||||
const refedSubchannels = subchannelObjArray.filter(
|
const refedSubchannels = subchannelObjArray.filter(
|
||||||
value => !value.subchannel.unrefIfOneRef()
|
value => !value.subchannel.unrefIfOneRef()
|
||||||
|
|
@ -77,8 +74,7 @@ export class SubchannelPool {
|
||||||
/* For each subchannel in the pool, try to unref it if it has
|
/* For each subchannel in the pool, try to unref it if it has
|
||||||
* exactly one ref (which is the ref from the pool itself). If that
|
* exactly one ref (which is the ref from the pool itself). If that
|
||||||
* does happen, remove the subchannel from the pool */
|
* does happen, remove the subchannel from the pool */
|
||||||
this.pool[channelTarget][subchannelTarget] = refedSubchannels;
|
this.pool[channelTarget] = refedSubchannels;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/* Currently we do not delete keys with empty values. If that results
|
/* Currently we do not delete keys with empty values. If that results
|
||||||
* in significant memory usage we should change it. */
|
* in significant memory usage we should change it. */
|
||||||
|
|
@ -114,17 +110,17 @@ export class SubchannelPool {
|
||||||
*/
|
*/
|
||||||
getOrCreateSubchannel(
|
getOrCreateSubchannel(
|
||||||
channelTarget: string,
|
channelTarget: string,
|
||||||
subchannelTarget: string,
|
subchannelTarget: SubchannelAddress,
|
||||||
channelArguments: ChannelOptions,
|
channelArguments: ChannelOptions,
|
||||||
channelCredentials: ChannelCredentials
|
channelCredentials: ChannelCredentials
|
||||||
): Subchannel {
|
): Subchannel {
|
||||||
this.ensureCleanupTask();
|
this.ensureCleanupTask();
|
||||||
|
|
||||||
if (channelTarget in this.pool) {
|
if (channelTarget in this.pool) {
|
||||||
if (subchannelTarget in this.pool[channelTarget]) {
|
const subchannelObjArray = this.pool[channelTarget];
|
||||||
const subchannelObjArray = this.pool[channelTarget][subchannelTarget];
|
|
||||||
for (const subchannelObj of subchannelObjArray) {
|
for (const subchannelObj of subchannelObjArray) {
|
||||||
if (
|
if (
|
||||||
|
subchannelAddressEqual(subchannelTarget, subchannelObj.subchannelAddress) &&
|
||||||
channelOptionsEqual(
|
channelOptionsEqual(
|
||||||
channelArguments,
|
channelArguments,
|
||||||
subchannelObj.channelArguments
|
subchannelObj.channelArguments
|
||||||
|
|
@ -135,7 +131,6 @@ export class SubchannelPool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
// If we get here, no matching subchannel was found
|
// If we get here, no matching subchannel was found
|
||||||
const subchannel = new Subchannel(
|
const subchannel = new Subchannel(
|
||||||
channelTarget,
|
channelTarget,
|
||||||
|
|
@ -144,12 +139,10 @@ export class SubchannelPool {
|
||||||
channelCredentials
|
channelCredentials
|
||||||
);
|
);
|
||||||
if (!(channelTarget in this.pool)) {
|
if (!(channelTarget in this.pool)) {
|
||||||
this.pool[channelTarget] = Object.create(null);
|
this.pool[channelTarget] = [];
|
||||||
}
|
}
|
||||||
if (!(subchannelTarget in this.pool[channelTarget])) {
|
this.pool[channelTarget].push({
|
||||||
this.pool[channelTarget][subchannelTarget] = [];
|
subchannelAddress: subchannelTarget,
|
||||||
}
|
|
||||||
this.pool[channelTarget][subchannelTarget].push({
|
|
||||||
channelArguments,
|
channelArguments,
|
||||||
channelCredentials,
|
channelCredentials,
|
||||||
subchannel,
|
subchannel,
|
||||||
|
|
|
||||||
|
|
@ -20,12 +20,13 @@ import { ChannelCredentials } from './channel-credentials';
|
||||||
import { Metadata } from './metadata';
|
import { Metadata } from './metadata';
|
||||||
import { Http2CallStream } from './call-stream';
|
import { Http2CallStream } from './call-stream';
|
||||||
import { ChannelOptions } from './channel-options';
|
import { ChannelOptions } from './channel-options';
|
||||||
import { PeerCertificate, checkServerIdentity } from 'tls';
|
import { PeerCertificate, checkServerIdentity, TLSSocket } from 'tls';
|
||||||
import { ConnectivityState } from './channel';
|
import { ConnectivityState } from './channel';
|
||||||
import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
|
import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
|
||||||
import { getDefaultAuthority } from './resolver';
|
import { getDefaultAuthority } from './resolver';
|
||||||
import * as logging from './logging';
|
import * as logging from './logging';
|
||||||
import { LogVerbosity } from './constants';
|
import { LogVerbosity } from './constants';
|
||||||
|
import { SocketConnectOpts } from 'net';
|
||||||
|
|
||||||
const { version: clientVersion } = require('../../package.json');
|
const { version: clientVersion } = require('../../package.json');
|
||||||
|
|
||||||
|
|
@ -73,6 +74,22 @@ function uniformRandom(min: number, max: number) {
|
||||||
|
|
||||||
const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii');
|
const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This represents a single backend address to connect to. This interface is a
|
||||||
|
* subset of net.SocketConnectOpts, i.e. the options described at
|
||||||
|
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener.
|
||||||
|
* Those are in turn a subset of the options that can be passed to http2.connect.
|
||||||
|
*/
|
||||||
|
export interface SubchannelAddress {
|
||||||
|
port?: number;
|
||||||
|
host?: string;
|
||||||
|
path?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function subchannelAddressEqual(address1: SubchannelAddress, address2: SubchannelAddress) : boolean {
|
||||||
|
return address1.port === address2.port && address1.host === address2.host && address1.path === address2.path;
|
||||||
|
}
|
||||||
|
|
||||||
export class Subchannel {
|
export class Subchannel {
|
||||||
/**
|
/**
|
||||||
* The subchannel's current connectivity state. Invariant: `session` === `null`
|
* The subchannel's current connectivity state. Invariant: `session` === `null`
|
||||||
|
|
@ -135,6 +152,11 @@ export class Subchannel {
|
||||||
*/
|
*/
|
||||||
private refcount = 0;
|
private refcount = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A string representation of the subchannel address, for logging/tracing
|
||||||
|
*/
|
||||||
|
private subchannelAddressString: string;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class representing a connection to a single backend.
|
* A class representing a connection to a single backend.
|
||||||
* @param channelTarget The target string for the channel as a whole
|
* @param channelTarget The target string for the channel as a whole
|
||||||
|
|
@ -147,7 +169,7 @@ export class Subchannel {
|
||||||
*/
|
*/
|
||||||
constructor(
|
constructor(
|
||||||
private channelTarget: string,
|
private channelTarget: string,
|
||||||
private subchannelAddress: string,
|
private subchannelAddress: SubchannelAddress,
|
||||||
private options: ChannelOptions,
|
private options: ChannelOptions,
|
||||||
private credentials: ChannelCredentials
|
private credentials: ChannelCredentials
|
||||||
) {
|
) {
|
||||||
|
|
@ -187,6 +209,11 @@ export class Subchannel {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}, backoffOptions);
|
}, backoffOptions);
|
||||||
|
if (subchannelAddress.host || subchannelAddress.port) {
|
||||||
|
this.subchannelAddressString = `${subchannelAddress.host}:${subchannelAddress.port}`;
|
||||||
|
} else {
|
||||||
|
this.subchannelAddressString = `${subchannelAddress.path}`;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -225,10 +252,11 @@ export class Subchannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
private startConnectingInternal() {
|
private startConnectingInternal() {
|
||||||
const connectionOptions: http2.SecureClientSessionOptions =
|
let connectionOptions: http2.SecureClientSessionOptions =
|
||||||
this.credentials._getConnectionOptions() || {};
|
this.credentials._getConnectionOptions() || {};
|
||||||
let addressScheme = 'http://';
|
let addressScheme = 'http://';
|
||||||
if ('secureContext' in connectionOptions) {
|
if ('secureContext' in connectionOptions) {
|
||||||
|
connectionOptions.protocol = 'https:';
|
||||||
addressScheme = 'https://';
|
addressScheme = 'https://';
|
||||||
// If provided, the value of grpc.ssl_target_name_override should be used
|
// If provided, the value of grpc.ssl_target_name_override should be used
|
||||||
// to override the target hostname when checking server identity.
|
// to override the target hostname when checking server identity.
|
||||||
|
|
@ -248,8 +276,21 @@ export class Subchannel {
|
||||||
connectionOptions.servername = getDefaultAuthority(this.channelTarget);
|
connectionOptions.servername = getDefaultAuthority(this.channelTarget);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
connectionOptions = Object.assign(connectionOptions, this.subchannelAddress);
|
||||||
|
/* http2.connect uses the options here:
|
||||||
|
* https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036
|
||||||
|
* The spread operator overides earlier values with later ones, so any port
|
||||||
|
* or host values in the options will be used rather than any values extracted
|
||||||
|
* from the first argument. In addition, the path overrides the host and port,
|
||||||
|
* as documented for plaintext connections here:
|
||||||
|
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener
|
||||||
|
* and for TLS connections here:
|
||||||
|
* https://nodejs.org/api/tls.html#tls_tls_connect_options_callback.
|
||||||
|
* The first argument just needs to be parseable as a URL and the scheme
|
||||||
|
* determines whether the connection will be established over TLS or not.
|
||||||
|
*/
|
||||||
const session = http2.connect(
|
const session = http2.connect(
|
||||||
addressScheme + this.subchannelAddress,
|
addressScheme + getDefaultAuthority(this.channelTarget),
|
||||||
connectionOptions
|
connectionOptions
|
||||||
);
|
);
|
||||||
this.session = session;
|
this.session = session;
|
||||||
|
|
@ -328,7 +369,7 @@ export class Subchannel {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
trace(
|
trace(
|
||||||
this.subchannelAddress +
|
this.subchannelAddressString +
|
||||||
' ' +
|
' ' +
|
||||||
ConnectivityState[this.connectivityState] +
|
ConnectivityState[this.connectivityState] +
|
||||||
' -> ' +
|
' -> ' +
|
||||||
|
|
@ -400,7 +441,7 @@ export class Subchannel {
|
||||||
|
|
||||||
callRef() {
|
callRef() {
|
||||||
trace(
|
trace(
|
||||||
this.subchannelAddress +
|
this.subchannelAddressString +
|
||||||
' callRefcount ' +
|
' callRefcount ' +
|
||||||
this.callRefcount +
|
this.callRefcount +
|
||||||
' -> ' +
|
' -> ' +
|
||||||
|
|
@ -417,7 +458,7 @@ export class Subchannel {
|
||||||
|
|
||||||
callUnref() {
|
callUnref() {
|
||||||
trace(
|
trace(
|
||||||
this.subchannelAddress +
|
this.subchannelAddressString +
|
||||||
' callRefcount ' +
|
' callRefcount ' +
|
||||||
this.callRefcount +
|
this.callRefcount +
|
||||||
' -> ' +
|
' -> ' +
|
||||||
|
|
@ -435,7 +476,7 @@ export class Subchannel {
|
||||||
|
|
||||||
ref() {
|
ref() {
|
||||||
trace(
|
trace(
|
||||||
this.subchannelAddress +
|
this.subchannelAddressString +
|
||||||
' callRefcount ' +
|
' callRefcount ' +
|
||||||
this.refcount +
|
this.refcount +
|
||||||
' -> ' +
|
' -> ' +
|
||||||
|
|
@ -446,7 +487,7 @@ export class Subchannel {
|
||||||
|
|
||||||
unref() {
|
unref() {
|
||||||
trace(
|
trace(
|
||||||
this.subchannelAddress +
|
this.subchannelAddressString +
|
||||||
' callRefcount ' +
|
' callRefcount ' +
|
||||||
this.refcount +
|
this.refcount +
|
||||||
' -> ' +
|
' -> ' +
|
||||||
|
|
@ -557,6 +598,6 @@ export class Subchannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
getAddress(): string {
|
getAddress(): string {
|
||||||
return this.subchannelAddress;
|
return this.subchannelAddressString;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import * as assert from 'assert';
|
||||||
import * as resolverManager from '../src/resolver';
|
import * as resolverManager from '../src/resolver';
|
||||||
import { ServiceConfig } from '../src/service-config';
|
import { ServiceConfig } from '../src/service-config';
|
||||||
import { StatusObject } from '../src/call-stream';
|
import { StatusObject } from '../src/call-stream';
|
||||||
|
import { SubchannelAddress } from '../src/subchannel';
|
||||||
|
|
||||||
describe('Name Resolver', () => {
|
describe('Name Resolver', () => {
|
||||||
describe('DNS Names', function() {
|
describe('DNS Names', function() {
|
||||||
|
|
@ -33,11 +34,11 @@ describe('Name Resolver', () => {
|
||||||
const target = 'localhost:50051';
|
const target = 'localhost:50051';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
assert(addressList.includes('127.0.0.1:50051'));
|
assert(addressList.some(addr => addr.host === '127.0.0.1' && addr.port === 50051));
|
||||||
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
||||||
done();
|
done();
|
||||||
},
|
},
|
||||||
|
|
@ -52,11 +53,11 @@ describe('Name Resolver', () => {
|
||||||
const target = 'localhost';
|
const target = 'localhost';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
assert(addressList.includes('127.0.0.1:443'));
|
assert(addressList.some(addr => addr.host === '127.0.0.1' && addr.port === 443));
|
||||||
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
||||||
done();
|
done();
|
||||||
},
|
},
|
||||||
|
|
@ -71,11 +72,11 @@ describe('Name Resolver', () => {
|
||||||
const target = '1.2.3.4';
|
const target = '1.2.3.4';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
assert(addressList.includes('1.2.3.4:443'));
|
assert(addressList.some(addr => addr.host === '1.2.3.4' && addr.port === 443));
|
||||||
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
||||||
done();
|
done();
|
||||||
},
|
},
|
||||||
|
|
@ -90,11 +91,11 @@ describe('Name Resolver', () => {
|
||||||
const target = '::1';
|
const target = '::1';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
assert(addressList.includes('[::1]:443'));
|
assert(addressList.some(addr => addr.host === '::1' && addr.port === 443));
|
||||||
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
||||||
done();
|
done();
|
||||||
},
|
},
|
||||||
|
|
@ -109,11 +110,11 @@ describe('Name Resolver', () => {
|
||||||
const target = '[::1]:50051';
|
const target = '[::1]:50051';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
assert(addressList.includes('[::1]:50051'));
|
assert(addressList.some(addr => addr.host === '::1' && addr.port === 50051));
|
||||||
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
// We would check for the IPv6 address but it needs to be omitted on some Node versions
|
||||||
done();
|
done();
|
||||||
},
|
},
|
||||||
|
|
@ -128,7 +129,7 @@ describe('Name Resolver', () => {
|
||||||
const target = 'example.com';
|
const target = 'example.com';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
|
|
@ -146,7 +147,7 @@ describe('Name Resolver', () => {
|
||||||
const target = 'loopback4.unittest.grpc.io';
|
const target = 'loopback4.unittest.grpc.io';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
|
|
@ -166,7 +167,7 @@ describe('Name Resolver', () => {
|
||||||
const target = 'network-tools.com';
|
const target = 'network-tools.com';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
|
|
@ -196,7 +197,7 @@ describe('Name Resolver', () => {
|
||||||
const target2 = 'grpc-test4.sandbox.googleapis.com';
|
const target2 = 'grpc-test4.sandbox.googleapis.com';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
|
|
@ -218,11 +219,11 @@ describe('Name Resolver', () => {
|
||||||
const target = 'unix:socket';
|
const target = 'unix:socket';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
assert(addressList.includes('socket'));
|
assert(addressList.some(addr => addr.path = 'socket'));
|
||||||
done();
|
done();
|
||||||
},
|
},
|
||||||
onError: (error: StatusObject) => {
|
onError: (error: StatusObject) => {
|
||||||
|
|
@ -236,11 +237,11 @@ describe('Name Resolver', () => {
|
||||||
const target = 'unix:///tmp/socket';
|
const target = 'unix:///tmp/socket';
|
||||||
const listener: resolverManager.ResolverListener = {
|
const listener: resolverManager.ResolverListener = {
|
||||||
onSuccessfulResolution: (
|
onSuccessfulResolution: (
|
||||||
addressList: string[],
|
addressList: SubchannelAddress[],
|
||||||
serviceConfig: ServiceConfig | null,
|
serviceConfig: ServiceConfig | null,
|
||||||
serviceConfigError: StatusObject | null
|
serviceConfigError: StatusObject | null
|
||||||
) => {
|
) => {
|
||||||
assert(addressList.includes('/tmp/socket'));
|
assert(addressList.some(addr => addr.path = '/tmp/socket'));
|
||||||
done();
|
done();
|
||||||
},
|
},
|
||||||
onError: (error: StatusObject) => {
|
onError: (error: StatusObject) => {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue