From fb2e7637c0c56185aa7676d2c1c63a97705d458b Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 13 Aug 2019 17:58:54 -0700 Subject: [PATCH] Update channel behavior + related classes --- .../grpc-js/src/call-credentials-filter.ts | 11 +- packages/grpc-js/src/call-stream.ts | 19 +- packages/grpc-js/src/channel.ts | 519 ++++++------------ packages/grpc-js/src/deadline-filter.ts | 6 +- .../grpc-js/src/load-balancer-pick-first.ts | 6 +- packages/grpc-js/src/load-balancer.ts | 9 +- packages/grpc-js/src/metadata.ts | 15 +- packages/grpc-js/src/resolver-dns.ts | 6 +- packages/grpc-js/src/resolver.ts | 10 + .../grpc-js/src/resolving-load-balancer.ts | 138 ++++- packages/grpc-js/src/subchannel-pool.ts | 8 +- 11 files changed, 352 insertions(+), 395 deletions(-) diff --git a/packages/grpc-js/src/call-credentials-filter.ts b/packages/grpc-js/src/call-credentials-filter.ts index e3d84237..a58f10ee 100644 --- a/packages/grpc-js/src/call-credentials-filter.ts +++ b/packages/grpc-js/src/call-credentials-filter.ts @@ -17,14 +17,14 @@ import { CallCredentials } from './call-credentials'; import { Call } from './call-stream'; -import { Http2Channel } from './channel'; +import { Channel } from './channel'; import { BaseFilter, Filter, FilterFactory } from './filter'; import { Metadata } from './metadata'; export class CallCredentialsFilter extends BaseFilter implements Filter { private serviceUrl: string; constructor( - private readonly channel: Http2Channel, + private readonly channel: Channel, private readonly stream: Call ) { super(); @@ -44,9 +44,7 @@ export class CallCredentialsFilter extends BaseFilter implements Filter { } async sendMetadata(metadata: Promise): Promise { - const channelCredentials = this.channel.credentials._getCallCredentials(); - const streamCredentials = this.stream.getCredentials(); - const credentials = channelCredentials.compose(streamCredentials); + const credentials = this.stream.getCredentials(); const credsMetadata = credentials.generateMetadata({ service_url: this.serviceUrl, }); @@ -58,8 +56,7 @@ export class CallCredentialsFilter extends BaseFilter implements Filter { export class CallCredentialsFilterFactory implements FilterFactory { - private readonly channel: Http2Channel; - constructor(channel: Http2Channel) { + constructor(private readonly channel: Channel) { this.channel = channel; } diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 35b957b3..71458600 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -19,7 +19,6 @@ import * as http2 from 'http2'; import { Duplex } from 'stream'; import { CallCredentials } from './call-credentials'; -import { Http2Channel } from './channel'; import { Status } from './constants'; import { EmitterAugmentation1 } from './events'; import { Filter } from './filter'; @@ -27,6 +26,7 @@ import { FilterStackFactory } from './filter-stack'; import { Metadata } from './metadata'; import { ObjectDuplex, WriteCallback } from './object-stream'; import { StreamDecoder } from './stream-decoder'; +import { ChannelImplementation } from './channel'; const { HTTP2_HEADER_STATUS, @@ -83,7 +83,7 @@ export type Call = { ObjectDuplex; export class Http2CallStream extends Duplex implements Call { - credentials: CallCredentials = CallCredentials.createEmpty(); + credentials: CallCredentials; filterStack: Filter; private http2Stream: http2.ClientHttp2Stream | null = null; private pendingRead = false; @@ -114,12 +114,14 @@ export class Http2CallStream extends Duplex implements Call { constructor( private readonly methodName: string, - private readonly channel: Http2Channel, + private readonly channel: ChannelImplementation, private readonly options: CallStreamOptions, - filterStackFactory: FilterStackFactory + filterStackFactory: FilterStackFactory, + private readonly channelCallCredentials: CallCredentials ) { super({ objectMode: true }); this.filterStack = filterStackFactory.createFilter(this); + this.credentials = channelCallCredentials; } /** @@ -358,12 +360,7 @@ export class Http2CallStream extends Duplex implements Call { } sendMetadata(metadata: Metadata): void { - this.channel._startHttp2Stream( - this.options.host, - this.methodName, - this, - metadata - ); + this.channel._startCallStream(this, metadata); } private destroyHttp2Stream() { @@ -395,7 +392,7 @@ export class Http2CallStream extends Duplex implements Call { } setCredentials(credentials: CallCredentials): void { - this.credentials = credentials; + this.credentials = this.channelCallCredentials.compose(credentials); } getStatus(): StatusObject | null { diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 68ba66e9..636750a1 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -15,44 +15,22 @@ * */ -import { EventEmitter } from 'events'; -import * as http2 from 'http2'; -import { checkServerIdentity, PeerCertificate } from 'tls'; -import * as url from 'url'; - -import { CallCredentialsFilterFactory } from './call-credentials-filter'; -import { - Call, - CallStreamOptions, - Deadline, - Http2CallStream, -} from './call-stream'; -import { ChannelCredentials } from './channel-credentials'; -import { ChannelOptions, recognizedOptions } from './channel-options'; -import { CompressionFilterFactory } from './compression-filter'; -import { Status } from './constants'; -import { DeadlineFilterFactory } from './deadline-filter'; -import { FilterStackFactory } from './filter-stack'; -import { Metadata } from './metadata'; -import { MetadataStatusFilterFactory } from './metadata-status-filter'; -import { Http2SubChannel } from './subchannel'; - -const { version: clientVersion } = require('../../package.json'); - -const MIN_CONNECT_TIMEOUT_MS = 20000; -const INITIAL_BACKOFF_MS = 1000; -const BACKOFF_MULTIPLIER = 1.6; -const MAX_BACKOFF_MS = 120000; -const BACKOFF_JITTER = 0.2; - -const { - HTTP2_HEADER_AUTHORITY, - HTTP2_HEADER_CONTENT_TYPE, - HTTP2_HEADER_METHOD, - HTTP2_HEADER_PATH, - HTTP2_HEADER_TE, - HTTP2_HEADER_USER_AGENT, -} = http2.constants; +import { Deadline, Call, Http2CallStream, CallStreamOptions } from "./call-stream"; +import { ChannelCredentials } from "./channel-credentials"; +import { ChannelOptions } from "./channel-options"; +import { ResolvingLoadBalancer } from "./resolving-load-balancer"; +import { SubchannelPool, getSubchannelPool } from "./subchannel-pool"; +import { ChannelControlHelper } from "./load-balancer"; +import { UnavailablePicker, Picker, PickResultType } from "./picker"; +import { Metadata } from "./metadata"; +import { SubchannelConnectivityState } from "./subchannel"; +import { Status } from "./constants"; +import { FilterStackFactory } from "./filter-stack"; +import { CallCredentialsFilterFactory } from "./call-credentials-filter"; +import { DeadlineFilterFactory } from "./deadline-filter"; +import { MetadataStatusFilterFactory } from "./metadata-status-filter"; +import { CompressionFilterFactory } from "./compression-filter"; +import { getDefaultAuthority } from "./resolver"; export enum ConnectivityState { CONNECTING, @@ -62,13 +40,6 @@ export enum ConnectivityState { SHUTDOWN, } -function uniformRandom(min: number, max: number) { - return Math.random() * (max - min) + min; -} - -// todo: maybe we want an interface for load balancing, but no implementation -// for anything complicated - /** * An interface that represents a communication channel to a server specified * by a given address. @@ -128,240 +99,159 @@ export interface Channel { ): Call; } -export class Http2Channel extends EventEmitter implements Channel { - private readonly userAgent: string; - private readonly target: url.URL; - private readonly defaultAuthority: string; +interface ConnectivityStateWatcher { + currentState: ConnectivityState; + timer: NodeJS.Timeout; + callback: (error?: Error) => void; +} + +export class ChannelImplementation implements Channel { + private resolvingLoadBalancer: ResolvingLoadBalancer; + private subchannelPool: SubchannelPool; private connectivityState: ConnectivityState = ConnectivityState.IDLE; - // Helper Promise object only used in the implementation of connect(). - private connecting: Promise | null = null; - /* For now, we have up to one subchannel, which will exist as long as we are - * connecting or trying to connect */ - private subChannel: Http2SubChannel | null = null; + private currentPicker: Picker = new UnavailablePicker(); + private pickQueue: {callStream: Http2CallStream, callMetadata: Metadata}[] = []; + private connectivityStateWatchers: ConnectivityStateWatcher[] = []; + private defaultAuthority: string; private filterStackFactory: FilterStackFactory; - - private subChannelConnectCallback: () => void = () => {}; - private subChannelCloseCallback: () => void = () => {}; - - private backoffTimerId: NodeJS.Timer; - private currentBackoff: number = INITIAL_BACKOFF_MS; - private currentBackoffDeadline: Date; - - private handleStateChange( - oldState: ConnectivityState, - newState: ConnectivityState - ): void { - const now: Date = new Date(); - switch (newState) { - case ConnectivityState.CONNECTING: - if (oldState === ConnectivityState.IDLE) { - this.currentBackoff = INITIAL_BACKOFF_MS; - this.currentBackoffDeadline = new Date( - now.getTime() + INITIAL_BACKOFF_MS - ); - } else if (oldState === ConnectivityState.TRANSIENT_FAILURE) { - this.currentBackoff = Math.min( - this.currentBackoff * BACKOFF_MULTIPLIER, - MAX_BACKOFF_MS - ); - const jitterMagnitude: number = BACKOFF_JITTER * this.currentBackoff; - this.currentBackoffDeadline = new Date( - now.getTime() + - this.currentBackoff + - uniformRandom(-jitterMagnitude, jitterMagnitude) - ); + constructor(private target: string, private readonly credentials: ChannelCredentials, private readonly options: ChannelOptions) { + // TODO: check channel arg for getting a private pool + this.subchannelPool = getSubchannelPool(true); + const channelControlHelper: ChannelControlHelper = { + createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => { + return this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials); + }, + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + this.currentPicker = picker; + const queueCopy = this.pickQueue.slice(); + this.pickQueue = []; + for (const {callStream, callMetadata} of queueCopy) { + this.tryPick(callStream, callMetadata); } - this.startConnecting(); - break; - case ConnectivityState.READY: - this.emit('connect'); - break; - case ConnectivityState.TRANSIENT_FAILURE: - this.subChannel = null; - this.backoffTimerId = setTimeout(() => { - this.transitionToState( - [ConnectivityState.TRANSIENT_FAILURE], - ConnectivityState.CONNECTING - ); - }, this.currentBackoffDeadline.getTime() - now.getTime()); - break; - case ConnectivityState.IDLE: - case ConnectivityState.SHUTDOWN: - if (this.subChannel) { - this.subChannel.close(); - this.subChannel.removeListener( - 'connect', - this.subChannelConnectCallback - ); - this.subChannel.removeListener('close', this.subChannelCloseCallback); - this.subChannel = null; - this.emit('shutdown'); - clearTimeout(this.backoffTimerId); - } - break; - default: - throw new Error('This should never happen'); - } - } - - // Transition from any of a set of oldStates to a specific newState - private transitionToState( - oldStates: ConnectivityState[], - newState: ConnectivityState - ): void { - if (oldStates.indexOf(this.connectivityState) > -1) { - const oldState: ConnectivityState = this.connectivityState; - this.connectivityState = newState; - this.handleStateChange(oldState, newState); - this.emit('connectivityStateChanged', newState); - } - } - - private startConnecting(): void { - const connectionOptions: http2.SecureClientSessionOptions = - this.credentials._getConnectionOptions() || {}; - if (connectionOptions.secureContext !== null) { - // If provided, the value of grpc.ssl_target_name_override should be used - // to override the target hostname when checking server identity. - // This option is used for testing only. - if (this.options['grpc.ssl_target_name_override']) { - const sslTargetNameOverride = this.options[ - 'grpc.ssl_target_name_override' - ]!; - connectionOptions.checkServerIdentity = ( - host: string, - cert: PeerCertificate - ): Error | undefined => { - return checkServerIdentity(sslTargetNameOverride, cert); - }; - connectionOptions.servername = sslTargetNameOverride; + this.updateState(connectivityState); + }, + requestReresolution: () => { + // This should never be called. + throw new Error('Resolving load balancer should never call requestReresolution'); } - } - const subChannel: Http2SubChannel = new Http2SubChannel( - this.target, - connectionOptions, - this.userAgent, - this.options - ); - this.subChannel = subChannel; - const now = new Date(); - const connectionTimeout: number = Math.max( - this.currentBackoffDeadline.getTime() - now.getTime(), - MIN_CONNECT_TIMEOUT_MS - ); - const connectionTimerId: NodeJS.Timer = setTimeout(() => { - // This should trigger the 'close' event, which will send us back to - // TRANSIENT_FAILURE - subChannel.close(); - }, connectionTimeout); - this.subChannelConnectCallback = () => { - // Connection succeeded - clearTimeout(connectionTimerId); - this.transitionToState( - [ConnectivityState.CONNECTING], - ConnectivityState.READY - ); }; - subChannel.once('connect', this.subChannelConnectCallback); - this.subChannelCloseCallback = () => { - // Connection failed - clearTimeout(connectionTimerId); - /* TODO(murgatroid99): verify that this works for - * CONNECTING->TRANSITIVE_FAILURE see nodejs/node#16645 */ - this.transitionToState( - [ConnectivityState.CONNECTING, ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE - ); - }; - subChannel.once('close', this.subChannelCloseCallback); - } - - constructor( - address: string, - readonly credentials: ChannelCredentials, - private readonly options: Partial - ) { - super(); - for (const option in options) { - if (options.hasOwnProperty(option)) { - if (!recognizedOptions.hasOwnProperty(option)) { - console.warn( - `Unrecognized channel argument '${option}' will be ignored.` - ); - } - } - } - if (credentials._isSecure()) { - this.target = new url.URL(`https://${address}`); - } else { - this.target = new url.URL(`http://${address}`); - } - // TODO(murgatroid99): Add more centralized handling of channel options - if (this.options['grpc.default_authority']) { - this.defaultAuthority = this.options['grpc.default_authority'] as string; - } else { - this.defaultAuthority = this.target.host; - } + // TODO: check channel arg for default service config + this.resolvingLoadBalancer = new ResolvingLoadBalancer(target, channelControlHelper, null); this.filterStackFactory = new FilterStackFactory([ new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this), new MetadataStatusFilterFactory(this), new CompressionFilterFactory(this), ]); - this.currentBackoffDeadline = new Date(); - /* The only purpose of these lines is to ensure that this.backoffTimerId has - * a value of type NodeJS.Timer. */ - this.backoffTimerId = setTimeout(() => {}, 0); - - // Build user-agent string. - this.userAgent = [ - options['grpc.primary_user_agent'], - `grpc-node-js/${clientVersion}`, - options['grpc.secondary_user_agent'], - ] - .filter(e => e) - .join(' '); // remove falsey values first + // TODO(murgatroid99): Add more centralized handling of channel options + if (this.options['grpc.default_authority']) { + this.defaultAuthority = this.options['grpc.default_authority'] as string; + } else { + this.defaultAuthority = getDefaultAuthority(target); + } } - _startHttp2Stream( - authority: string, - methodName: string, - stream: Http2CallStream, - metadata: Metadata - ) { - const connectMetadata: Promise = this.connect().then(() => - metadata.clone() - ); - const finalMetadata: Promise = stream.filterStack.sendMetadata( - connectMetadata - ); - finalMetadata - .then(metadataValue => { - const headers = metadataValue.toHttp2Headers(); - headers[HTTP2_HEADER_AUTHORITY] = authority; - headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; - headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; - headers[HTTP2_HEADER_METHOD] = 'POST'; - headers[HTTP2_HEADER_PATH] = methodName; - headers[HTTP2_HEADER_TE] = 'trailers'; - if (this.connectivityState === ConnectivityState.READY) { - const subChannel: Http2SubChannel = this.subChannel!; - subChannel.startCallStream(metadataValue, stream); + /** + * Check the picker output for the given call and corresponding metadata, + * and take any relevant actions. Should not be called while iterating + * over pickQueue. + * @param callStream + * @param callMetadata + */ + private tryPick(callStream: Http2CallStream, callMetadata: Metadata) { + const pickResult = this.currentPicker.pick({metadata: callMetadata}); + switch(pickResult.pickResultType) { + case PickResultType.COMPLETE: + if (pickResult.subchannel === null) { + // End the call with an error } else { - /* In this case, we lost the connection while finalizing - * metadata. That should be very unusual */ - setImmediate(() => { - this._startHttp2Stream(authority, methodName, stream, metadata); + /* If the subchannel disconnects between calling pick and getting + * the filter stack metadata, the call will end with an error. */ + callStream.filterStack.sendMetadata(Promise.resolve(new Metadata())).then((finalMetadata) => { + if (pickResult.subchannel!.getConnectivityState() === SubchannelConnectivityState.READY) { + pickResult.subchannel!.startCallStream(callMetadata, callStream); + } else { + callStream.cancelWithStatus(Status.UNAVAILABLE, 'Connection dropped while starting call'); + } + }, + (error: Error & { code: number }) => { + // We assume the error code isn't 0 (Status.OK) + callStream.cancelWithStatus( + error.code || Status.UNKNOWN, + `Getting metadata from plugin failed with error: ${error.message}` + ); }); } - }) - .catch((error: Error & { code: number }) => { - // We assume the error code isn't 0 (Status.OK) - stream.cancelWithStatus( - error.code || Status.UNKNOWN, - `Getting metadata from plugin failed with error: ${error.message}` - ); - }); + break; + case PickResultType.QUEUE: + this.pickQueue.push({callStream, callMetadata}); + break; + case PickResultType.TRANSIENT_FAILURE: + if (callMetadata.getOptions().waitForReady) { + this.pickQueue.push({callStream, callMetadata}); + } else { + callStream.cancelWithStatus(pickResult.status!.code, pickResult.status!.details); + } + break; + } + } + + private removeConnectivityStateWatcher(watcherObject: ConnectivityStateWatcher) { + const watcherIndex = this.connectivityStateWatchers.findIndex((value) => value === watcherObject); + if (watcherIndex >= 0) { + this.connectivityStateWatchers.splice(watcherIndex, 1); + } + } + + private updateState(newState: ConnectivityState): void { + this.connectivityState = newState; + const watchersCopy = this.connectivityStateWatchers.slice(); + for (const watcherObject of watchersCopy) { + if (newState !== watcherObject.currentState) { + watcherObject.callback(); + clearTimeout(watcherObject.timer); + this.removeConnectivityStateWatcher(watcherObject); + } + } + } + + _startCallStream(stream: Http2CallStream, metadata: Metadata) { + this.tryPick(stream, metadata); + } + + close() { + this.resolvingLoadBalancer.destroy(); + this.updateState(ConnectivityState.SHUTDOWN); + } + + getTarget() { + return this.target; + } + + getConnectivityState() { + return this.connectivityState; + } + + watchConnectivityState( + currentState: ConnectivityState, + deadline: Date | number, + callback: (error?: Error) => void + ): void { + const deadlineDate: Date = deadline instanceof Date ? deadline : new Date(deadline); + const now = new Date(); + if (deadlineDate <= now) { + process.nextTick(callback, new Error('Deadline passed without connectivity state change')); + return; + } + const watcherObject = { + currentState, + callback, + timer: setTimeout(() => { + this.removeConnectivityStateWatcher(watcherObject); + callback(new Error('Deadline passed without connectivity state change')); + }, deadlineDate.getTime() - now.getTime()) + }; + this.connectivityStateWatchers.push(watcherObject); } createCall( @@ -385,106 +275,9 @@ export class Http2Channel extends EventEmitter implements Channel { method, this, finalOptions, - this.filterStackFactory + this.filterStackFactory, + this.credentials._getCallCredentials() ); return stream; } - - /** - * Attempts to connect, returning a Promise that resolves when the connection - * is successful, or rejects if the channel is shut down. - */ - private connect(): Promise { - if (this.connectivityState === ConnectivityState.READY) { - return Promise.resolve(); - } else if (this.connectivityState === ConnectivityState.SHUTDOWN) { - return Promise.reject(new Error('Channel has been shut down')); - } else { - // In effect, this.connecting is only assigned upon the first attempt to - // transition from IDLE to CONNECTING, so this condition could have also - // been (connectivityState === IDLE). - if (!this.connecting) { - this.connecting = new Promise((resolve, reject) => { - this.transitionToState( - [ConnectivityState.IDLE], - ConnectivityState.CONNECTING - ); - const onConnect = () => { - this.connecting = null; - this.removeListener('shutdown', onShutdown); - resolve(); - }; - const onShutdown = () => { - this.connecting = null; - this.removeListener('connect', onConnect); - reject(new Error('Channel has been shut down')); - }; - this.once('connect', onConnect); - this.once('shutdown', onShutdown); - }); - } - return this.connecting; - } - } - - getConnectivityState(tryToConnect: boolean): ConnectivityState { - if (tryToConnect) { - this.transitionToState( - [ConnectivityState.IDLE], - ConnectivityState.CONNECTING - ); - } - return this.connectivityState; - } - - watchConnectivityState( - currentState: ConnectivityState, - deadline: Date | number, - callback: (error?: Error) => void - ) { - if (this.connectivityState !== currentState) { - /* If the connectivity state is different from the provided currentState, - * we assume that a state change has successfully occurred */ - setImmediate(callback); - } else { - let deadlineMs = 0; - if (deadline instanceof Date) { - deadlineMs = deadline.getTime(); - } else { - deadlineMs = deadline; - } - let timeout: number = deadlineMs - Date.now(); - if (timeout < 0) { - timeout = 0; - } - const timeoutId = setTimeout(() => { - this.removeListener('connectivityStateChanged', eventCb); - callback(new Error('Channel state did not change before deadline')); - }, timeout); - const eventCb = () => { - clearTimeout(timeoutId); - callback(); - }; - this.once('connectivityStateChanged', eventCb); - } - } - - getTarget() { - return this.target.toString(); - } - - close() { - if (this.connectivityState === ConnectivityState.SHUTDOWN) { - throw new Error('Channel has been shut down'); - } - this.transitionToState( - [ - ConnectivityState.CONNECTING, - ConnectivityState.READY, - ConnectivityState.TRANSIENT_FAILURE, - ConnectivityState.IDLE, - ], - ConnectivityState.SHUTDOWN - ); - } -} +} \ No newline at end of file diff --git a/packages/grpc-js/src/deadline-filter.ts b/packages/grpc-js/src/deadline-filter.ts index 771cb96a..5c3cf532 100644 --- a/packages/grpc-js/src/deadline-filter.ts +++ b/packages/grpc-js/src/deadline-filter.ts @@ -16,7 +16,7 @@ */ import { Call } from './call-stream'; -import { ConnectivityState, Http2Channel } from './channel'; +import { ConnectivityState, Channel } from './channel'; import { Status } from './constants'; import { BaseFilter, Filter, FilterFactory } from './filter'; import { Metadata } from './metadata'; @@ -44,7 +44,7 @@ export class DeadlineFilter extends BaseFilter implements Filter { private timer: NodeJS.Timer | null = null; private deadline: number; constructor( - private readonly channel: Http2Channel, + private readonly channel: Channel, private readonly callStream: Call ) { super(); @@ -85,7 +85,7 @@ export class DeadlineFilter extends BaseFilter implements Filter { } export class DeadlineFilterFactory implements FilterFactory { - constructor(private readonly channel: Http2Channel) {} + constructor(private readonly channel: Channel) {} createFilter(callStream: Call): DeadlineFilter { return new DeadlineFilter(this.channel, callStream); diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 1fbbf425..a07c857e 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -162,7 +162,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { } } - updateAddressList(addressList: string[], lbConfig?: LoadBalancingConfig): void { + updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null): void { // lbConfig has no useful information for pick first load balancing this.latestAddressList = addressList; this.connectToAddressList(); @@ -191,6 +191,10 @@ export class PickFirstLoadBalancer implements LoadBalancer { getTypeName(): string { return TYPE_NAME; } + + replaceChannelControlHelper(channelControlHelper: ChannelControlHelper) { + this.channelControlHelper = channelControlHelper; + } } export function setup(): void { diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index 02f4ad4b..14aa3e47 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -31,7 +31,7 @@ export interface ChannelControlHelper { * @param subchannelAddress The address to connect to * @param subchannelArgs Extra channel arguments specified by the load balancer */ - createSubchannel(subchannelAddress: String, subchannelArgs: ChannelOptions): Subchannel; + createSubchannel(subchannelAddress: string, subchannelArgs: ChannelOptions): Subchannel; /** * Passes a new subchannel picker up to the channel. This is called if either * the connectivity state changes or if a different picker is needed for any @@ -60,7 +60,7 @@ export interface LoadBalancer { * @param lbConfig The load balancing config object from the service config, * if one was provided */ - updateAddressList(addressList: string[], lbConfig?: LoadBalancingConfig): void; + updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null): void; /** * If the load balancer is currently in the IDLE state, start connecting. */ @@ -82,6 +82,11 @@ export interface LoadBalancer { * balancer implementation class was registered with. */ getTypeName(): string; + /** + * Replace the existing ChannelControlHelper with a new one + * @param channelControlHelper The new ChannelControlHelper to use from now on + */ + replaceChannelControlHelper(channelControlHelper: ChannelControlHelper): void; } export interface LoadBalancerConstructor { diff --git a/packages/grpc-js/src/metadata.ts b/packages/grpc-js/src/metadata.ts index d321b9b0..4f0ed14d 100644 --- a/packages/grpc-js/src/metadata.ts +++ b/packages/grpc-js/src/metadata.ts @@ -62,7 +62,7 @@ function validate(key: string, value?: MetadataValue): void { } } -interface MetadataOptions { +export interface MetadataOptions { /* Signal that the request is idempotent. Defaults to false */ idempotentRequest?: boolean; /* Signal that the call should not return UNAVAILABLE before it has @@ -80,8 +80,15 @@ interface MetadataOptions { */ export class Metadata { protected internalRepr: MetadataObject = new Map(); + private options: MetadataOptions; - constructor(private options?: MetadataOptions) {} + constructor(options?: MetadataOptions) { + if (options === undefined) { + this.options = {}; + } else { + this.options = options; + } + } /** * Sets the given value for the given key by replacing any other values @@ -200,6 +207,10 @@ export class Metadata { this.options = options; } + getOptions(): MetadataOptions { + return this.options; + } + /** * Creates an OutgoingHttpHeaders object that can be used with the http2 API. */ diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index f1e9ba8d..52b8893b 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -20,6 +20,7 @@ import * as util from 'util'; import { extractAndSelectServiceConfig, ServiceConfig } from './service-config'; import { ServiceError } from './call'; import { Status } from './constants'; +import { URL } from 'url'; /* These regular expressions match IP addresses with optional ports in different * formats. In each case, capture group 1 contains the address, and capture @@ -87,7 +88,6 @@ class DnsResolver implements Resolver { } } this.percentage = Math.random() * 100; - this.startResolution(); } private startResolution() { @@ -140,6 +140,10 @@ class DnsResolver implements Resolver { this.startResolution(); } } + + static getDefaultAuthority(target: string): string { + return new URL(target).hostname; + } } export function setup(): void { diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 10a48537..f2811591 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -29,6 +29,7 @@ export interface Resolver { export interface ResolverConstructor { new(target: string, listener: ResolverListener): Resolver; + getDefaultAuthority(target:string): string; } const registeredResolvers: {[prefix: string]: ResolverConstructor} = {}; @@ -52,4 +53,13 @@ export function createResolver(target: string, listener: ResolverListener): Reso return new defaultResolver(target, listener); } throw new Error('No resolver could be created for the provided target'); +} + +export function getDefaultAuthority(target: string): string { + for (const prefix of Object.keys(registerDefaultResolver)) { + if (target.startsWith(prefix)) { + return registeredResolvers[prefix].getDefaultAuthority(target); + } + } + throw new Error(`Invalid target "${target}"`); } \ No newline at end of file diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index a403a220..ce863f7f 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -15,16 +15,23 @@ * */ -import { ChannelControlHelper, LoadBalancer, isLoadBalancerNameRegistered } from "./load-balancer"; +import { ChannelControlHelper, LoadBalancer, isLoadBalancerNameRegistered, createLoadBalancer } from "./load-balancer"; import { ServiceConfig } from "./service-config"; import { ConnectivityState } from "./channel"; import { createResolver, Resolver } from "./resolver"; import { ServiceError } from "./call"; +import { ChannelOptions } from "./channel-options"; +import { Picker, UnavailablePicker, QueuePicker } from "./picker"; +import { LoadBalancingConfig } from "./load-balancing-config"; const DEFAULT_LOAD_BALANCER_NAME = 'pick_first'; -export class ResolvingLoadBalancer { +export class ResolvingLoadBalancer implements LoadBalancer { private innerResolver: Resolver; + /** + * Current internal load balancer used for handling calls. + * Invariant: innerLoadBalancer === null => pendingReplacementLoadBalancer === null. + */ private innerLoadBalancer: LoadBalancer | null = null; private pendingReplacementLoadBalancer: LoadBalancer | null = null; private currentState: ConnectivityState = ConnectivityState.IDLE; @@ -36,8 +43,32 @@ export class ResolvingLoadBalancer { */ private previousServiceConfig: ServiceConfig | null | undefined = undefined; - constructor (private target: string, private channelControlHelper: ChannelControlHelper, private defaultServiceConfig: ServiceConfig | null) { + private innerBalancerState: ConnectivityState = ConnectivityState.IDLE; + /** + * The most recent reported state of the pendingReplacementLoadBalancer. + * Starts at IDLE for type simplicity. This should get updated as soon as the + * pendingReplacementLoadBalancer gets constructed. + */ + private replacementBalancerState: ConnectivityState = ConnectivityState.IDLE; + /** + * The picker associated with the replacementBalancerState. Starts as an + * UnavailablePicker for type simplicity. This should get updated as soon as + * the pendingReplacementLoadBalancer gets constructed. + */ + private replacementBalancerPicker: Picker = new UnavailablePicker(); + + /** + * ChannelControlHelper for the innerLoadBalancer. + */ + private readonly innerChannelControlHelper: ChannelControlHelper; + /** + * ChannelControlHelper for the pendingReplacementLoadBalancer. + */ + private readonly replacementChannelControlHelper: ChannelControlHelper; + + constructor (private target: string, private channelControlHelper: ChannelControlHelper, private defaultServiceConfig: ServiceConfig | null) { + this.channelControlHelper.updateState(ConnectivityState.IDLE, new QueuePicker(this)); this.innerResolver = createResolver(target, { onSuccessfulResolution: (addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null) => { let workingServiceConfig: ServiceConfig | null = null; @@ -57,14 +88,17 @@ export class ResolvingLoadBalancer { this.previousServiceConfig = serviceConfig; } let loadBalancerName: string | null = null; + let loadBalancingConfig: LoadBalancingConfig | null = null; if (workingServiceConfig === null || workingServiceConfig.loadBalancingConfig.length === 0) { loadBalancerName = DEFAULT_LOAD_BALANCER_NAME; } else { for (const lbConfig of workingServiceConfig.loadBalancingConfig) { + // Iterating through a oneof looking for whichever one is populated for (const key in lbConfig) { if (Object.prototype.hasOwnProperty.call(lbConfig, key)) { if (isLoadBalancerNameRegistered(key)) { loadBalancerName = key; + loadBalancingConfig = lbConfig; break; } } @@ -79,14 +113,112 @@ export class ResolvingLoadBalancer { return; } } + if (this.innerLoadBalancer === null) { + this.innerLoadBalancer = createLoadBalancer(loadBalancerName, this.innerChannelControlHelper)!; + this.innerLoadBalancer.updateAddressList(addressList, loadBalancingConfig); + } else if (this.innerLoadBalancer.getTypeName() === loadBalancerName) { + this.innerLoadBalancer.updateAddressList(addressList, loadBalancingConfig); + } else { + if (this.pendingReplacementLoadBalancer === null || this.pendingReplacementLoadBalancer.getTypeName() !== loadBalancerName) { + if (this.pendingReplacementLoadBalancer !== null) { + this.pendingReplacementLoadBalancer.destroy(); + } + this.pendingReplacementLoadBalancer = createLoadBalancer(loadBalancerName, this.replacementChannelControlHelper)!; + } + this.pendingReplacementLoadBalancer.updateAddressList(addressList, loadBalancingConfig); + } }, onError: (error: ServiceError) => { this.handleResolutionFailure(error); } }); + + this.innerChannelControlHelper = { + createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => { + return this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs); + }, + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + this.innerBalancerState = connectivityState; + if (connectivityState === ConnectivityState.TRANSIENT_FAILURE && this.pendingReplacementLoadBalancer !== null) { + this.switchOverReplacementBalancer(); + } else { + this.channelControlHelper.updateState(connectivityState, picker); + } + }, + requestReresolution: () => { + if (this.pendingReplacementLoadBalancer === null) { + this.innerResolver.updateResolution(); + } + } + } + + this.replacementChannelControlHelper = { + createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => { + return this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs); + }, + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + this.replacementBalancerState = connectivityState; + this.replacementBalancerPicker = picker; + if (connectivityState === ConnectivityState.READY) { + this.switchOverReplacementBalancer(); + } + }, + requestReresolution: () => { + this.innerResolver.updateResolution(); + } + }; + } + + /** + * Stop using the current innerLoadBalancer and replace it with the + * pendingReplacementLoadBalancer. Must only be called if both of + * those are currently not null. + */ + private switchOverReplacementBalancer() { + this.innerLoadBalancer!.destroy(); + this.innerLoadBalancer = this.pendingReplacementLoadBalancer!; + this.innerLoadBalancer.replaceChannelControlHelper(this.innerChannelControlHelper); + this.pendingReplacementLoadBalancer = null; + this.innerBalancerState = this.replacementBalancerState; + this.channelControlHelper.updateState(this.replacementBalancerState, this.replacementBalancerPicker); } private handleResolutionFailure(error: ServiceError) { } + + exitIdle() { + this.innerResolver.updateResolution(); + if (this.innerLoadBalancer !== null) { + this.innerLoadBalancer.exitIdle(); + } + } + + updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null) { + throw new Error('updateAddressList not supported on ResolvingLoadBalancer'); + } + + resetBackoff() { + // TODO + } + + destroy() { + if (this.innerLoadBalancer !== null) { + this.innerLoadBalancer.destroy(); + this.innerLoadBalancer = null; + } + if (this.pendingReplacementLoadBalancer !== null) { + this.pendingReplacementLoadBalancer.destroy(); + this.pendingReplacementLoadBalancer = null; + } + // Go to another state? + } + + getTypeName() { + return 'resolving_load_balancer'; + } + + replaceChannelControlHelper(channelControlHelper: ChannelControlHelper) { + this.channelControlHelper = channelControlHelper; + } } \ No newline at end of file diff --git a/packages/grpc-js/src/subchannel-pool.ts b/packages/grpc-js/src/subchannel-pool.ts index 06b95827..55274295 100644 --- a/packages/grpc-js/src/subchannel-pool.ts +++ b/packages/grpc-js/src/subchannel-pool.ts @@ -73,6 +73,10 @@ export class SubchannelPool { const globalSubchannelPool = new SubchannelPool(true); -export function getOrCreateSubchannel(channelTarget: string, subchannelTarget: string, channelArguments: ChannelOptions, channelCredentials: ChannelCredentials): Subchannel { - return globalSubchannelPool.getOrCreateSubchannel(channelTarget, subchannelTarget, channelArguments, channelCredentials); +export function getSubchannelPool(global: boolean): SubchannelPool { + if (global) { + return globalSubchannelPool; + } else { + return new SubchannelPool(false); + } } \ No newline at end of file