Merge pull request #1245 from murgatroid99/grpc-js_subchannel_address_object

grpc-js: Use an object to represent subchannel addresses
This commit is contained in:
Michael Lumish 2020-02-04 10:16:28 -08:00 committed by GitHub
commit ea21c4f6ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 259 additions and 108 deletions

View File

@ -18,7 +18,7 @@
import { ConnectionOptions, createSecureContext, PeerCertificate } from 'tls'; import { ConnectionOptions, createSecureContext, PeerCertificate } from 'tls';
import { CallCredentials } from './call-credentials'; import { CallCredentials } from './call-credentials';
import {CIPHER_SUITES, getDefaultRootsData} from './tls-helpers'; import { CIPHER_SUITES, getDefaultRootsData } from './tls-helpers';
// tslint:disable-next-line:no-any // tslint:disable-next-line:no-any
function verifyIsBufferOrNull(obj: any, friendlyName: string): void { function verifyIsBufferOrNull(obj: any, friendlyName: string): void {
@ -190,7 +190,7 @@ class SecureChannelCredentialsImpl extends ChannelCredentials {
ca: rootCerts || undefined, ca: rootCerts || undefined,
key: privateKey || undefined, key: privateKey || undefined,
cert: certChain || undefined, cert: certChain || undefined,
ciphers: CIPHER_SUITES ciphers: CIPHER_SUITES,
}); });
this.connectionOptions = { secureContext }; this.connectionOptions = { secureContext };
if (verifyOptions && verifyOptions.checkServerIdentity) { if (verifyOptions && verifyOptions.checkServerIdentity) {

View File

@ -38,6 +38,7 @@ import { getDefaultAuthority } from './resolver';
import { LoadBalancingConfig } from './load-balancing-config'; import { LoadBalancingConfig } from './load-balancing-config';
import { ServiceConfig, validateServiceConfig } from './service-config'; import { ServiceConfig, validateServiceConfig } from './service-config';
import { trace } from './logging'; import { trace } from './logging';
import { SubchannelAddress } from './subchannel';
export enum ConnectivityState { export enum ConnectivityState {
CONNECTING, CONNECTING,
@ -142,10 +143,12 @@ export class ChannelImplementation implements Channel {
) { ) {
/* The global boolean parameter to getSubchannelPool has the inverse meaning to what /* The global boolean parameter to getSubchannelPool has the inverse meaning to what
* the grpc.use_local_subchannel_pool channel option means. */ * the grpc.use_local_subchannel_pool channel option means. */
this.subchannelPool = getSubchannelPool((options['grpc.use_local_subchannel_pool'] ?? 0) === 0); this.subchannelPool = getSubchannelPool(
(options['grpc.use_local_subchannel_pool'] ?? 0) === 0
);
const channelControlHelper: ChannelControlHelper = { const channelControlHelper: ChannelControlHelper = {
createSubchannel: ( createSubchannel: (
subchannelAddress: string, subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions subchannelArgs: ChannelOptions
) => { ) => {
return this.subchannelPool.getOrCreateSubchannel( return this.subchannelPool.getOrCreateSubchannel(

View File

@ -30,7 +30,11 @@ import {
UnavailablePicker, UnavailablePicker,
} from './picker'; } from './picker';
import { LoadBalancingConfig } from './load-balancing-config'; import { LoadBalancingConfig } from './load-balancing-config';
import { Subchannel, ConnectivityStateListener } from './subchannel'; import {
Subchannel,
ConnectivityStateListener,
SubchannelAddress,
} from './subchannel';
import * as logging from './logging'; import * as logging from './logging';
import { LogVerbosity } from './constants'; import { LogVerbosity } from './constants';
@ -76,7 +80,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
/** /**
* The list of backend addresses most recently passed to `updateAddressList`. * The list of backend addresses most recently passed to `updateAddressList`.
*/ */
private latestAddressList: string[] = []; private latestAddressList: SubchannelAddress[] = [];
/** /**
* The list of subchannels this load balancer is currently attempting to * The list of subchannels this load balancer is currently attempting to
* connect to. * connect to.
@ -369,7 +373,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
} }
updateAddressList( updateAddressList(
addressList: string[], addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig | null lbConfig: LoadBalancingConfig | null
): void { ): void {
// lbConfig has no useful information for pick first load balancing // lbConfig has no useful information for pick first load balancing

View File

@ -30,7 +30,11 @@ import {
UnavailablePicker, UnavailablePicker,
} from './picker'; } from './picker';
import { LoadBalancingConfig } from './load-balancing-config'; import { LoadBalancingConfig } from './load-balancing-config';
import { Subchannel, ConnectivityStateListener } from './subchannel'; import {
Subchannel,
ConnectivityStateListener,
SubchannelAddress,
} from './subchannel';
const TYPE_NAME = 'round_robin'; const TYPE_NAME = 'round_robin';
@ -168,7 +172,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
} }
updateAddressList( updateAddressList(
addressList: string[], addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig | null lbConfig: LoadBalancingConfig | null
): void { ): void {
this.resetSubchannelList(); this.resetSubchannelList();

View File

@ -16,7 +16,7 @@
*/ */
import { ChannelOptions } from './channel-options'; import { ChannelOptions } from './channel-options';
import { Subchannel } from './subchannel'; import { Subchannel, SubchannelAddress } from './subchannel';
import { ConnectivityState } from './channel'; import { ConnectivityState } from './channel';
import { Picker } from './picker'; import { Picker } from './picker';
import { LoadBalancingConfig } from './load-balancing-config'; import { LoadBalancingConfig } from './load-balancing-config';
@ -34,7 +34,7 @@ export interface ChannelControlHelper {
* @param subchannelArgs Extra channel arguments specified by the load balancer * @param subchannelArgs Extra channel arguments specified by the load balancer
*/ */
createSubchannel( createSubchannel(
subchannelAddress: string, subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions subchannelArgs: ChannelOptions
): Subchannel; ): Subchannel;
/** /**
@ -66,7 +66,7 @@ export interface LoadBalancer {
* if one was provided * if one was provided
*/ */
updateAddressList( updateAddressList(
addressList: string[], addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig | null lbConfig: LoadBalancingConfig | null
): void; ): void;
/** /**

View File

@ -30,6 +30,7 @@ import { StatusObject } from './call-stream';
import { Metadata } from './metadata'; import { Metadata } from './metadata';
import * as logging from './logging'; import * as logging from './logging';
import { LogVerbosity } from './constants'; import { LogVerbosity } from './constants';
import { SubchannelAddress, TcpSubchannelAddress } from './subchannel';
const TRACER_NAME = 'dns_resolver'; const TRACER_NAME = 'dns_resolver';
@ -112,7 +113,7 @@ const dnsLookupPromise = util.promisify(dns.lookup);
* @param target * @param target
* @return An "IP:port" string in an array if parsing was successful, `null` otherwise * @return An "IP:port" string in an array if parsing was successful, `null` otherwise
*/ */
function parseIP(target: string): string[] | null { function parseIP(target: string): SubchannelAddress[] | null {
/* These three regular expressions are all mutually exclusive, so we just /* These three regular expressions are all mutually exclusive, so we just
* want the first one that matches the target string, if any do. */ * want the first one that matches the target string, if any do. */
const ipv4Match = IPV4_REGEX.exec(target); const ipv4Match = IPV4_REGEX.exec(target);
@ -123,14 +124,14 @@ function parseIP(target: string): string[] | null {
} }
// ipv6 addresses should be bracketed // ipv6 addresses should be bracketed
const addr = ipv4Match ? match[1] : `[${match[1]}]`; const addr = match[1];
let port: string; let port: string;
if (match[2]) { if (match[2]) {
port = match[2]; port = match[2];
} else { } else {
port = DEFAULT_PORT; port = DEFAULT_PORT;
} }
return [`${addr}:${port}`]; return [{ host: addr, port: +port }];
} }
/** /**
@ -161,7 +162,7 @@ function mergeArrays<T>(...arrays: T[][]): T[] {
* Resolver implementation that handles DNS names and IP addresses. * Resolver implementation that handles DNS names and IP addresses.
*/ */
class DnsResolver implements Resolver { class DnsResolver implements Resolver {
private readonly ipResult: string[] | null; private readonly ipResult: SubchannelAddress[] | null;
private readonly dnsHostname: string | null; private readonly dnsHostname: string | null;
private readonly port: string | null; private readonly port: string | null;
/* The promise results here contain, in order, the A record, the AAAA record, /* The promise results here contain, in order, the A record, the AAAA record,
@ -222,23 +223,28 @@ class DnsResolver implements Resolver {
this.pendingResultPromise.then( this.pendingResultPromise.then(
([addressList, txtRecord]) => { ([addressList, txtRecord]) => {
this.pendingResultPromise = null; this.pendingResultPromise = null;
const ip4Addresses: string[] = addressList const ip4Addresses: dns.LookupAddress[] = addressList.filter(
.filter(addr => addr.family === 4) addr => addr.family === 4
.map(addr => `${addr.address}:${this.port}`); );
let ip6Addresses: string[]; let ip6Addresses: dns.LookupAddress[];
if (semver.satisfies(process.version, IPV6_SUPPORT_RANGE)) { if (semver.satisfies(process.version, IPV6_SUPPORT_RANGE)) {
ip6Addresses = addressList ip6Addresses = addressList.filter(addr => addr.family === 6);
.filter(addr => addr.family === 6)
.map(addr => `[${addr.address}]:${this.port}`);
} else { } else {
ip6Addresses = []; ip6Addresses = [];
} }
const allAddresses: string[] = mergeArrays( const allAddresses: TcpSubchannelAddress[] = mergeArrays(
ip4Addresses, ip4Addresses,
ip6Addresses ip6Addresses
); ).map(addr => ({ host: addr.address, port: +this.port! }));
const allAddressesString: string =
'[' +
allAddresses.map(addr => addr.host + ':' + addr.port).join(',') +
']';
trace( trace(
'Resolved addresses for target ' + this.target + ': ' + allAddresses 'Resolved addresses for target ' +
this.target +
': ' +
allAddressesString
); );
if (allAddresses.length === 0) { if (allAddresses.length === 0) {
this.listener.onError(this.defaultResolutionError); this.listener.onError(this.defaultResolutionError);

View File

@ -20,6 +20,7 @@ import {
registerResolver, registerResolver,
registerDefaultResolver, registerDefaultResolver,
} from './resolver'; } from './resolver';
import { SubchannelAddress } from './subchannel';
function getUdsName(target: string): string { function getUdsName(target: string): string {
/* Due to how this resolver is registered, it should only be constructed /* Due to how this resolver is registered, it should only be constructed
@ -34,9 +35,9 @@ function getUdsName(target: string): string {
} }
class UdsResolver implements Resolver { class UdsResolver implements Resolver {
private addresses: string[] = []; private addresses: SubchannelAddress[] = [];
constructor(target: string, private listener: ResolverListener) { constructor(target: string, private listener: ResolverListener) {
this.addresses = [getUdsName(target)]; this.addresses = [{ path: getUdsName(target) }];
} }
updateResolution(): void { updateResolution(): void {
process.nextTick( process.nextTick(

View File

@ -20,6 +20,7 @@ import { ServiceConfig } from './service-config';
import * as resolver_dns from './resolver-dns'; import * as resolver_dns from './resolver-dns';
import * as resolver_uds from './resolver-uds'; import * as resolver_uds from './resolver-uds';
import { StatusObject } from './call-stream'; import { StatusObject } from './call-stream';
import { SubchannelAddress } from './subchannel';
/** /**
* A listener object passed to the resolver's constructor that provides name * A listener object passed to the resolver's constructor that provides name
@ -36,7 +37,7 @@ export interface ResolverListener {
* service configuration was invalid * service configuration was invalid
*/ */
onSuccessfulResolution( onSuccessfulResolution(
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
): void; ): void;

View File

@ -34,6 +34,7 @@ import { StatusObject } from './call-stream';
import { Metadata } from './metadata'; import { Metadata } from './metadata';
import * as logging from './logging'; import * as logging from './logging';
import { LogVerbosity } from './constants'; import { LogVerbosity } from './constants';
import { SubchannelAddress } from './subchannel';
const TRACER_NAME = 'resolving_load_balancer'; const TRACER_NAME = 'resolving_load_balancer';
@ -132,7 +133,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.innerResolver = createResolver(target, { this.innerResolver = createResolver(target, {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null serviceConfigError: ServiceError | null
) => { ) => {
@ -243,7 +244,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
this.innerChannelControlHelper = { this.innerChannelControlHelper = {
createSubchannel: ( createSubchannel: (
subchannelAddress: string, subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions subchannelArgs: ChannelOptions
) => { ) => {
return this.channelControlHelper.createSubchannel( return this.channelControlHelper.createSubchannel(
@ -289,7 +290,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
this.replacementChannelControlHelper = { this.replacementChannelControlHelper = {
createSubchannel: ( createSubchannel: (
subchannelAddress: string, subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions subchannelArgs: ChannelOptions
) => { ) => {
return this.channelControlHelper.createSubchannel( return this.channelControlHelper.createSubchannel(
@ -409,7 +410,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
} }
updateAddressList( updateAddressList(
addressList: string[], addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig | null lbConfig: LoadBalancingConfig | null
) { ) {
throw new Error('updateAddressList not supported on ResolvingLoadBalancer'); throw new Error('updateAddressList not supported on ResolvingLoadBalancer');

View File

@ -16,7 +16,7 @@
*/ */
import { SecureServerOptions } from 'http2'; import { SecureServerOptions } from 'http2';
import {CIPHER_SUITES, getDefaultRootsData} from './tls-helpers'; import { CIPHER_SUITES, getDefaultRootsData } from './tls-helpers';
export interface KeyCertPair { export interface KeyCertPair {
private_key: Buffer; private_key: Buffer;
@ -75,7 +75,7 @@ export abstract class ServerCredentials {
cert, cert,
key, key,
requestCert: checkClientCertificate, requestCert: checkClientCertificate,
ciphers: CIPHER_SUITES ciphers: CIPHER_SUITES,
}); });
} }
} }

View File

@ -203,11 +203,16 @@ export class Server {
const options: ListenOptions = { host: url.hostname, port: +url.port }; const options: ListenOptions = { host: url.hostname, port: +url.port };
const serverOptions: http2.ServerOptions = {}; const serverOptions: http2.ServerOptions = {};
if ('grpc.max_concurrent_streams' in this.options) { if ('grpc.max_concurrent_streams' in this.options) {
serverOptions.settings = {maxConcurrentStreams: this.options['grpc.max_concurrent_streams']}; serverOptions.settings = {
maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
};
} }
if (creds._isSecure()) { if (creds._isSecure()) {
const secureServerOptions = Object.assign(serverOptions, creds._getSettings()!); const secureServerOptions = Object.assign(
serverOptions,
creds._getSettings()!
);
this.http2Server = http2.createSecureServer(secureServerOptions); this.http2Server = http2.createSecureServer(secureServerOptions);
} else { } else {
this.http2Server = http2.createServer(serverOptions); this.http2Server = http2.createServer(serverOptions);

View File

@ -16,7 +16,11 @@
*/ */
import { ChannelOptions, channelOptionsEqual } from './channel-options'; import { ChannelOptions, channelOptionsEqual } from './channel-options';
import { Subchannel } from './subchannel'; import {
Subchannel,
SubchannelAddress,
subchannelAddressEqual,
} from './subchannel';
import { ChannelCredentials } from './channel-credentials'; import { ChannelCredentials } from './channel-credentials';
// 10 seconds in milliseconds. This value is arbitrary. // 10 seconds in milliseconds. This value is arbitrary.
@ -28,13 +32,12 @@ const REF_CHECK_INTERVAL = 10_000;
export class SubchannelPool { export class SubchannelPool {
private pool: { private pool: {
[channelTarget: string]: { [channelTarget: string]: Array<{
[subchannelTarget: string]: Array<{ subchannelAddress: SubchannelAddress;
channelArguments: ChannelOptions; channelArguments: ChannelOptions;
channelCredentials: ChannelCredentials; channelCredentials: ChannelCredentials;
subchannel: Subchannel; subchannel: Subchannel;
}>; }>;
};
} = Object.create(null); } = Object.create(null);
/** /**
@ -62,23 +65,20 @@ export class SubchannelPool {
* do not need to be filtered */ * do not need to be filtered */
// tslint:disable-next-line:forin // tslint:disable-next-line:forin
for (const channelTarget in this.pool) { for (const channelTarget in this.pool) {
// tslint:disable-next-line:forin const subchannelObjArray = this.pool[channelTarget];
for (const subchannelTarget in this.pool[channelTarget]) {
const subchannelObjArray = this.pool[channelTarget][subchannelTarget];
const refedSubchannels = subchannelObjArray.filter( const refedSubchannels = subchannelObjArray.filter(
value => !value.subchannel.unrefIfOneRef() value => !value.subchannel.unrefIfOneRef()
); );
if (refedSubchannels.length > 0) { if (refedSubchannels.length > 0) {
allSubchannelsUnrefed = false; allSubchannelsUnrefed = false;
}
/* For each subchannel in the pool, try to unref it if it has
* exactly one ref (which is the ref from the pool itself). If that
* does happen, remove the subchannel from the pool */
this.pool[channelTarget][subchannelTarget] = refedSubchannels;
} }
/* For each subchannel in the pool, try to unref it if it has
* exactly one ref (which is the ref from the pool itself). If that
* does happen, remove the subchannel from the pool */
this.pool[channelTarget] = refedSubchannels;
} }
/* Currently we do not delete keys with empty values. If that results /* Currently we do not delete keys with empty values. If that results
* in significant memory usage we should change it. */ * in significant memory usage we should change it. */
@ -114,25 +114,27 @@ export class SubchannelPool {
*/ */
getOrCreateSubchannel( getOrCreateSubchannel(
channelTarget: string, channelTarget: string,
subchannelTarget: string, subchannelTarget: SubchannelAddress,
channelArguments: ChannelOptions, channelArguments: ChannelOptions,
channelCredentials: ChannelCredentials channelCredentials: ChannelCredentials
): Subchannel { ): Subchannel {
this.ensureCleanupTask(); this.ensureCleanupTask();
if (channelTarget in this.pool) { if (channelTarget in this.pool) {
if (subchannelTarget in this.pool[channelTarget]) { const subchannelObjArray = this.pool[channelTarget];
const subchannelObjArray = this.pool[channelTarget][subchannelTarget]; for (const subchannelObj of subchannelObjArray) {
for (const subchannelObj of subchannelObjArray) { if (
if ( subchannelAddressEqual(
channelOptionsEqual( subchannelTarget,
channelArguments, subchannelObj.subchannelAddress
subchannelObj.channelArguments ) &&
) && channelOptionsEqual(
channelCredentials._equals(subchannelObj.channelCredentials) channelArguments,
) { subchannelObj.channelArguments
return subchannelObj.subchannel; ) &&
} channelCredentials._equals(subchannelObj.channelCredentials)
) {
return subchannelObj.subchannel;
} }
} }
} }
@ -144,12 +146,10 @@ export class SubchannelPool {
channelCredentials channelCredentials
); );
if (!(channelTarget in this.pool)) { if (!(channelTarget in this.pool)) {
this.pool[channelTarget] = Object.create(null); this.pool[channelTarget] = [];
} }
if (!(subchannelTarget in this.pool[channelTarget])) { this.pool[channelTarget].push({
this.pool[channelTarget][subchannelTarget] = []; subchannelAddress: subchannelTarget,
}
this.pool[channelTarget][subchannelTarget].push({
channelArguments, channelArguments,
channelCredentials, channelCredentials,
subchannel, subchannel,

View File

@ -20,12 +20,13 @@ import { ChannelCredentials } from './channel-credentials';
import { Metadata } from './metadata'; import { Metadata } from './metadata';
import { Http2CallStream } from './call-stream'; import { Http2CallStream } from './call-stream';
import { ChannelOptions } from './channel-options'; import { ChannelOptions } from './channel-options';
import { PeerCertificate, checkServerIdentity } from 'tls'; import { PeerCertificate, checkServerIdentity, TLSSocket } from 'tls';
import { ConnectivityState } from './channel'; import { ConnectivityState } from './channel';
import { BackoffTimeout, BackoffOptions } from './backoff-timeout'; import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
import { getDefaultAuthority } from './resolver'; import { getDefaultAuthority } from './resolver';
import * as logging from './logging'; import * as logging from './logging';
import { LogVerbosity } from './constants'; import { LogVerbosity } from './constants';
import * as net from 'net';
const { version: clientVersion } = require('../../package.json'); const { version: clientVersion } = require('../../package.json');
@ -73,6 +74,44 @@ function uniformRandom(min: number, max: number) {
const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii'); const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii');
export interface TcpSubchannelAddress {
port: number;
host: string;
}
export interface IpcSubchannelAddress {
path: string;
}
/**
* This represents a single backend address to connect to. This interface is a
* subset of net.SocketConnectOpts, i.e. the options described at
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener.
* Those are in turn a subset of the options that can be passed to http2.connect.
*/
export type SubchannelAddress = TcpSubchannelAddress | IpcSubchannelAddress;
export function isTcpSubchannelAddress(
address: SubchannelAddress
): address is TcpSubchannelAddress {
return 'port' in address;
}
export function subchannelAddressEqual(
address1: SubchannelAddress,
address2: SubchannelAddress
): boolean {
if (isTcpSubchannelAddress(address1)) {
return (
isTcpSubchannelAddress(address2) &&
address1.host === address2.host &&
address1.port === address2.port
);
} else {
return !isTcpSubchannelAddress(address2) && address1.path === address2.path;
}
}
export class Subchannel { export class Subchannel {
/** /**
* The subchannel's current connectivity state. Invariant: `session` === `null` * The subchannel's current connectivity state. Invariant: `session` === `null`
@ -135,6 +174,11 @@ export class Subchannel {
*/ */
private refcount = 0; private refcount = 0;
/**
* A string representation of the subchannel address, for logging/tracing
*/
private subchannelAddressString: string;
/** /**
* A class representing a connection to a single backend. * A class representing a connection to a single backend.
* @param channelTarget The target string for the channel as a whole * @param channelTarget The target string for the channel as a whole
@ -147,7 +191,7 @@ export class Subchannel {
*/ */
constructor( constructor(
private channelTarget: string, private channelTarget: string,
private subchannelAddress: string, private subchannelAddress: SubchannelAddress,
private options: ChannelOptions, private options: ChannelOptions,
private credentials: ChannelCredentials private credentials: ChannelCredentials
) { ) {
@ -172,7 +216,7 @@ export class Subchannel {
clearTimeout(this.keepaliveTimeoutId); clearTimeout(this.keepaliveTimeoutId);
const backoffOptions: BackoffOptions = { const backoffOptions: BackoffOptions = {
initialDelay: options['grpc.initial_reconnect_backoff_ms'], initialDelay: options['grpc.initial_reconnect_backoff_ms'],
maxDelay: options['grpc.max_reconnect_backoff_ms'] maxDelay: options['grpc.max_reconnect_backoff_ms'],
}; };
this.backoffTimeout = new BackoffTimeout(() => { this.backoffTimeout = new BackoffTimeout(() => {
if (this.continueConnecting) { if (this.continueConnecting) {
@ -187,6 +231,11 @@ export class Subchannel {
); );
} }
}, backoffOptions); }, backoffOptions);
if (isTcpSubchannelAddress(subchannelAddress)) {
this.subchannelAddressString = `${subchannelAddress.host}:${subchannelAddress.port}`;
} else {
this.subchannelAddressString = `${subchannelAddress.path}`;
}
} }
/** /**
@ -225,7 +274,7 @@ export class Subchannel {
} }
private startConnectingInternal() { private startConnectingInternal() {
const connectionOptions: http2.SecureClientSessionOptions = let connectionOptions: http2.SecureClientSessionOptions =
this.credentials._getConnectionOptions() || {}; this.credentials._getConnectionOptions() || {};
let addressScheme = 'http://'; let addressScheme = 'http://';
if ('secureContext' in connectionOptions) { if ('secureContext' in connectionOptions) {
@ -247,9 +296,40 @@ export class Subchannel {
} else { } else {
connectionOptions.servername = getDefaultAuthority(this.channelTarget); connectionOptions.servername = getDefaultAuthority(this.channelTarget);
} }
} else {
/* In all but the most recent versions of Node, http2.connect does not use
* the options when establishing plaintext connections, so we need to
* establish that connection explicitly. */
connectionOptions.createConnection = (authority, option) => {
/* net.NetConnectOpts is declared in a way that is more restrictive
* than what net.connect will actually accept, so we use the type
* assertion to work around that. */
return net.connect(this.subchannelAddress as net.NetConnectOpts);
};
} }
connectionOptions = Object.assign(
connectionOptions,
this.subchannelAddress
);
/* http2.connect uses the options here:
* https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036
* The spread operator overides earlier values with later ones, so any port
* or host values in the options will be used rather than any values extracted
* from the first argument. In addition, the path overrides the host and port,
* as documented for plaintext connections here:
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener
* and for TLS connections here:
* https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In
* earlier versions of Node, http2.connect passes these options to
* tls.connect but not net.connect, so in the insecure case we still need
* to set the createConnection option above to create the connection
* explicitly. We cannot do that in the TLS case because http2.connect
* passes necessary additional options to tls.connect.
* The first argument just needs to be parseable as a URL and the scheme
* determines whether the connection will be established over TLS or not.
*/
const session = http2.connect( const session = http2.connect(
addressScheme + this.subchannelAddress, addressScheme + getDefaultAuthority(this.channelTarget),
connectionOptions connectionOptions
); );
this.session = session; this.session = session;
@ -328,7 +408,7 @@ export class Subchannel {
return false; return false;
} }
trace( trace(
this.subchannelAddress + this.subchannelAddressString +
' ' + ' ' +
ConnectivityState[this.connectivityState] + ConnectivityState[this.connectivityState] +
' -> ' + ' -> ' +
@ -400,7 +480,7 @@ export class Subchannel {
callRef() { callRef() {
trace( trace(
this.subchannelAddress + this.subchannelAddressString +
' callRefcount ' + ' callRefcount ' +
this.callRefcount + this.callRefcount +
' -> ' + ' -> ' +
@ -417,7 +497,7 @@ export class Subchannel {
callUnref() { callUnref() {
trace( trace(
this.subchannelAddress + this.subchannelAddressString +
' callRefcount ' + ' callRefcount ' +
this.callRefcount + this.callRefcount +
' -> ' + ' -> ' +
@ -435,7 +515,7 @@ export class Subchannel {
ref() { ref() {
trace( trace(
this.subchannelAddress + this.subchannelAddressString +
' callRefcount ' + ' callRefcount ' +
this.refcount + this.refcount +
' -> ' + ' -> ' +
@ -446,7 +526,7 @@ export class Subchannel {
unref() { unref() {
trace( trace(
this.subchannelAddress + this.subchannelAddressString +
' callRefcount ' + ' callRefcount ' +
this.refcount + this.refcount +
' -> ' + ' -> ' +
@ -557,6 +637,6 @@ export class Subchannel {
} }
getAddress(): string { getAddress(): string {
return this.subchannelAddress; return this.subchannelAddressString;
} }
} }

View File

@ -17,7 +17,8 @@
import * as fs from 'fs'; import * as fs from 'fs';
export const CIPHER_SUITES: string | undefined = process.env.GRPC_SSL_CIPHER_SUITES; export const CIPHER_SUITES: string | undefined =
process.env.GRPC_SSL_CIPHER_SUITES;
const DEFAULT_ROOTS_FILE_PATH = process.env.GRPC_DEFAULT_SSL_ROOTS_FILE_PATH; const DEFAULT_ROOTS_FILE_PATH = process.env.GRPC_DEFAULT_SSL_ROOTS_FILE_PATH;
@ -31,4 +32,4 @@ export function getDefaultRootsData(): Buffer | null {
return defaultRootsData; return defaultRootsData;
} }
return null; return null;
} }

View File

@ -21,6 +21,7 @@ import * as assert from 'assert';
import * as resolverManager from '../src/resolver'; import * as resolverManager from '../src/resolver';
import { ServiceConfig } from '../src/service-config'; import { ServiceConfig } from '../src/service-config';
import { StatusObject } from '../src/call-stream'; import { StatusObject } from '../src/call-stream';
import { SubchannelAddress, isTcpSubchannelAddress } from '../src/subchannel';
describe('Name Resolver', () => { describe('Name Resolver', () => {
describe('DNS Names', function() { describe('DNS Names', function() {
@ -33,11 +34,18 @@ describe('Name Resolver', () => {
const target = 'localhost:50051'; const target = 'localhost:50051';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
assert(addressList.includes('127.0.0.1:50051')); assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '127.0.0.1' &&
addr.port === 50051
)
);
// We would check for the IPv6 address but it needs to be omitted on some Node versions // We would check for the IPv6 address but it needs to be omitted on some Node versions
done(); done();
}, },
@ -52,11 +60,18 @@ describe('Name Resolver', () => {
const target = 'localhost'; const target = 'localhost';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
assert(addressList.includes('127.0.0.1:443')); assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '127.0.0.1' &&
addr.port === 443
)
);
// We would check for the IPv6 address but it needs to be omitted on some Node versions // We would check for the IPv6 address but it needs to be omitted on some Node versions
done(); done();
}, },
@ -71,11 +86,18 @@ describe('Name Resolver', () => {
const target = '1.2.3.4'; const target = '1.2.3.4';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
assert(addressList.includes('1.2.3.4:443')); assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '1.2.3.4' &&
addr.port === 443
)
);
// We would check for the IPv6 address but it needs to be omitted on some Node versions // We would check for the IPv6 address but it needs to be omitted on some Node versions
done(); done();
}, },
@ -90,11 +112,18 @@ describe('Name Resolver', () => {
const target = '::1'; const target = '::1';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
assert(addressList.includes('[::1]:443')); assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '::1' &&
addr.port === 443
)
);
// We would check for the IPv6 address but it needs to be omitted on some Node versions // We would check for the IPv6 address but it needs to be omitted on some Node versions
done(); done();
}, },
@ -109,11 +138,18 @@ describe('Name Resolver', () => {
const target = '[::1]:50051'; const target = '[::1]:50051';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
assert(addressList.includes('[::1]:50051')); assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '::1' &&
addr.port === 50051
)
);
// We would check for the IPv6 address but it needs to be omitted on some Node versions // We would check for the IPv6 address but it needs to be omitted on some Node versions
done(); done();
}, },
@ -128,7 +164,7 @@ describe('Name Resolver', () => {
const target = 'example.com'; const target = 'example.com';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
@ -146,7 +182,7 @@ describe('Name Resolver', () => {
const target = 'loopback4.unittest.grpc.io'; const target = 'loopback4.unittest.grpc.io';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
@ -166,7 +202,7 @@ describe('Name Resolver', () => {
const target = 'network-tools.com'; const target = 'network-tools.com';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
@ -196,7 +232,7 @@ describe('Name Resolver', () => {
const target2 = 'grpc-test4.sandbox.googleapis.com'; const target2 = 'grpc-test4.sandbox.googleapis.com';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
@ -218,11 +254,15 @@ describe('Name Resolver', () => {
const target = 'unix:socket'; const target = 'unix:socket';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
assert(addressList.includes('socket')); assert(
addressList.some(
addr => !isTcpSubchannelAddress(addr) && addr.path === 'socket'
)
);
done(); done();
}, },
onError: (error: StatusObject) => { onError: (error: StatusObject) => {
@ -236,11 +276,16 @@ describe('Name Resolver', () => {
const target = 'unix:///tmp/socket'; const target = 'unix:///tmp/socket';
const listener: resolverManager.ResolverListener = { const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: ( onSuccessfulResolution: (
addressList: string[], addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null, serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null serviceConfigError: StatusObject | null
) => { ) => {
assert(addressList.includes('/tmp/socket')); assert(
addressList.some(
addr =>
!isTcpSubchannelAddress(addr) && addr.path === '/tmp/socket'
)
);
done(); done();
}, },
onError: (error: StatusObject) => { onError: (error: StatusObject) => {