mirror of https://github.com/grpc/grpc-node.git
Update channel behavior + related classes
This commit is contained in:
parent
e612cd9934
commit
fb2e7637c0
|
|
@ -17,14 +17,14 @@
|
|||
|
||||
import { CallCredentials } from './call-credentials';
|
||||
import { Call } from './call-stream';
|
||||
import { Http2Channel } from './channel';
|
||||
import { Channel } from './channel';
|
||||
import { BaseFilter, Filter, FilterFactory } from './filter';
|
||||
import { Metadata } from './metadata';
|
||||
|
||||
export class CallCredentialsFilter extends BaseFilter implements Filter {
|
||||
private serviceUrl: string;
|
||||
constructor(
|
||||
private readonly channel: Http2Channel,
|
||||
private readonly channel: Channel,
|
||||
private readonly stream: Call
|
||||
) {
|
||||
super();
|
||||
|
|
@ -44,9 +44,7 @@ export class CallCredentialsFilter extends BaseFilter implements Filter {
|
|||
}
|
||||
|
||||
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
|
||||
const channelCredentials = this.channel.credentials._getCallCredentials();
|
||||
const streamCredentials = this.stream.getCredentials();
|
||||
const credentials = channelCredentials.compose(streamCredentials);
|
||||
const credentials = this.stream.getCredentials();
|
||||
const credsMetadata = credentials.generateMetadata({
|
||||
service_url: this.serviceUrl,
|
||||
});
|
||||
|
|
@ -58,8 +56,7 @@ export class CallCredentialsFilter extends BaseFilter implements Filter {
|
|||
|
||||
export class CallCredentialsFilterFactory
|
||||
implements FilterFactory<CallCredentialsFilter> {
|
||||
private readonly channel: Http2Channel;
|
||||
constructor(channel: Http2Channel) {
|
||||
constructor(private readonly channel: Channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import * as http2 from 'http2';
|
|||
import { Duplex } from 'stream';
|
||||
|
||||
import { CallCredentials } from './call-credentials';
|
||||
import { Http2Channel } from './channel';
|
||||
import { Status } from './constants';
|
||||
import { EmitterAugmentation1 } from './events';
|
||||
import { Filter } from './filter';
|
||||
|
|
@ -27,6 +26,7 @@ import { FilterStackFactory } from './filter-stack';
|
|||
import { Metadata } from './metadata';
|
||||
import { ObjectDuplex, WriteCallback } from './object-stream';
|
||||
import { StreamDecoder } from './stream-decoder';
|
||||
import { ChannelImplementation } from './channel';
|
||||
|
||||
const {
|
||||
HTTP2_HEADER_STATUS,
|
||||
|
|
@ -83,7 +83,7 @@ export type Call = {
|
|||
ObjectDuplex<WriteObject, Buffer>;
|
||||
|
||||
export class Http2CallStream extends Duplex implements Call {
|
||||
credentials: CallCredentials = CallCredentials.createEmpty();
|
||||
credentials: CallCredentials;
|
||||
filterStack: Filter;
|
||||
private http2Stream: http2.ClientHttp2Stream | null = null;
|
||||
private pendingRead = false;
|
||||
|
|
@ -114,12 +114,14 @@ export class Http2CallStream extends Duplex implements Call {
|
|||
|
||||
constructor(
|
||||
private readonly methodName: string,
|
||||
private readonly channel: Http2Channel,
|
||||
private readonly channel: ChannelImplementation,
|
||||
private readonly options: CallStreamOptions,
|
||||
filterStackFactory: FilterStackFactory
|
||||
filterStackFactory: FilterStackFactory,
|
||||
private readonly channelCallCredentials: CallCredentials
|
||||
) {
|
||||
super({ objectMode: true });
|
||||
this.filterStack = filterStackFactory.createFilter(this);
|
||||
this.credentials = channelCallCredentials;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -358,12 +360,7 @@ export class Http2CallStream extends Duplex implements Call {
|
|||
}
|
||||
|
||||
sendMetadata(metadata: Metadata): void {
|
||||
this.channel._startHttp2Stream(
|
||||
this.options.host,
|
||||
this.methodName,
|
||||
this,
|
||||
metadata
|
||||
);
|
||||
this.channel._startCallStream(this, metadata);
|
||||
}
|
||||
|
||||
private destroyHttp2Stream() {
|
||||
|
|
@ -395,7 +392,7 @@ export class Http2CallStream extends Duplex implements Call {
|
|||
}
|
||||
|
||||
setCredentials(credentials: CallCredentials): void {
|
||||
this.credentials = credentials;
|
||||
this.credentials = this.channelCallCredentials.compose(credentials);
|
||||
}
|
||||
|
||||
getStatus(): StatusObject | null {
|
||||
|
|
|
|||
|
|
@ -15,44 +15,22 @@
|
|||
*
|
||||
*/
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import * as http2 from 'http2';
|
||||
import { checkServerIdentity, PeerCertificate } from 'tls';
|
||||
import * as url from 'url';
|
||||
|
||||
import { CallCredentialsFilterFactory } from './call-credentials-filter';
|
||||
import {
|
||||
Call,
|
||||
CallStreamOptions,
|
||||
Deadline,
|
||||
Http2CallStream,
|
||||
} from './call-stream';
|
||||
import { ChannelCredentials } from './channel-credentials';
|
||||
import { ChannelOptions, recognizedOptions } from './channel-options';
|
||||
import { CompressionFilterFactory } from './compression-filter';
|
||||
import { Status } from './constants';
|
||||
import { DeadlineFilterFactory } from './deadline-filter';
|
||||
import { FilterStackFactory } from './filter-stack';
|
||||
import { Metadata } from './metadata';
|
||||
import { MetadataStatusFilterFactory } from './metadata-status-filter';
|
||||
import { Http2SubChannel } from './subchannel';
|
||||
|
||||
const { version: clientVersion } = require('../../package.json');
|
||||
|
||||
const MIN_CONNECT_TIMEOUT_MS = 20000;
|
||||
const INITIAL_BACKOFF_MS = 1000;
|
||||
const BACKOFF_MULTIPLIER = 1.6;
|
||||
const MAX_BACKOFF_MS = 120000;
|
||||
const BACKOFF_JITTER = 0.2;
|
||||
|
||||
const {
|
||||
HTTP2_HEADER_AUTHORITY,
|
||||
HTTP2_HEADER_CONTENT_TYPE,
|
||||
HTTP2_HEADER_METHOD,
|
||||
HTTP2_HEADER_PATH,
|
||||
HTTP2_HEADER_TE,
|
||||
HTTP2_HEADER_USER_AGENT,
|
||||
} = http2.constants;
|
||||
import { Deadline, Call, Http2CallStream, CallStreamOptions } from "./call-stream";
|
||||
import { ChannelCredentials } from "./channel-credentials";
|
||||
import { ChannelOptions } from "./channel-options";
|
||||
import { ResolvingLoadBalancer } from "./resolving-load-balancer";
|
||||
import { SubchannelPool, getSubchannelPool } from "./subchannel-pool";
|
||||
import { ChannelControlHelper } from "./load-balancer";
|
||||
import { UnavailablePicker, Picker, PickResultType } from "./picker";
|
||||
import { Metadata } from "./metadata";
|
||||
import { SubchannelConnectivityState } from "./subchannel";
|
||||
import { Status } from "./constants";
|
||||
import { FilterStackFactory } from "./filter-stack";
|
||||
import { CallCredentialsFilterFactory } from "./call-credentials-filter";
|
||||
import { DeadlineFilterFactory } from "./deadline-filter";
|
||||
import { MetadataStatusFilterFactory } from "./metadata-status-filter";
|
||||
import { CompressionFilterFactory } from "./compression-filter";
|
||||
import { getDefaultAuthority } from "./resolver";
|
||||
|
||||
export enum ConnectivityState {
|
||||
CONNECTING,
|
||||
|
|
@ -62,13 +40,6 @@ export enum ConnectivityState {
|
|||
SHUTDOWN,
|
||||
}
|
||||
|
||||
function uniformRandom(min: number, max: number) {
|
||||
return Math.random() * (max - min) + min;
|
||||
}
|
||||
|
||||
// todo: maybe we want an interface for load balancing, but no implementation
|
||||
// for anything complicated
|
||||
|
||||
/**
|
||||
* An interface that represents a communication channel to a server specified
|
||||
* by a given address.
|
||||
|
|
@ -128,240 +99,159 @@ export interface Channel {
|
|||
): Call;
|
||||
}
|
||||
|
||||
export class Http2Channel extends EventEmitter implements Channel {
|
||||
private readonly userAgent: string;
|
||||
private readonly target: url.URL;
|
||||
private readonly defaultAuthority: string;
|
||||
interface ConnectivityStateWatcher {
|
||||
currentState: ConnectivityState;
|
||||
timer: NodeJS.Timeout;
|
||||
callback: (error?: Error) => void;
|
||||
}
|
||||
|
||||
export class ChannelImplementation implements Channel {
|
||||
private resolvingLoadBalancer: ResolvingLoadBalancer;
|
||||
private subchannelPool: SubchannelPool;
|
||||
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
||||
// Helper Promise object only used in the implementation of connect().
|
||||
private connecting: Promise<void> | null = null;
|
||||
/* For now, we have up to one subchannel, which will exist as long as we are
|
||||
* connecting or trying to connect */
|
||||
private subChannel: Http2SubChannel | null = null;
|
||||
private currentPicker: Picker = new UnavailablePicker();
|
||||
private pickQueue: {callStream: Http2CallStream, callMetadata: Metadata}[] = [];
|
||||
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
|
||||
private defaultAuthority: string;
|
||||
private filterStackFactory: FilterStackFactory;
|
||||
|
||||
private subChannelConnectCallback: () => void = () => {};
|
||||
private subChannelCloseCallback: () => void = () => {};
|
||||
|
||||
private backoffTimerId: NodeJS.Timer;
|
||||
private currentBackoff: number = INITIAL_BACKOFF_MS;
|
||||
private currentBackoffDeadline: Date;
|
||||
|
||||
private handleStateChange(
|
||||
oldState: ConnectivityState,
|
||||
newState: ConnectivityState
|
||||
): void {
|
||||
const now: Date = new Date();
|
||||
switch (newState) {
|
||||
case ConnectivityState.CONNECTING:
|
||||
if (oldState === ConnectivityState.IDLE) {
|
||||
this.currentBackoff = INITIAL_BACKOFF_MS;
|
||||
this.currentBackoffDeadline = new Date(
|
||||
now.getTime() + INITIAL_BACKOFF_MS
|
||||
);
|
||||
} else if (oldState === ConnectivityState.TRANSIENT_FAILURE) {
|
||||
this.currentBackoff = Math.min(
|
||||
this.currentBackoff * BACKOFF_MULTIPLIER,
|
||||
MAX_BACKOFF_MS
|
||||
);
|
||||
const jitterMagnitude: number = BACKOFF_JITTER * this.currentBackoff;
|
||||
this.currentBackoffDeadline = new Date(
|
||||
now.getTime() +
|
||||
this.currentBackoff +
|
||||
uniformRandom(-jitterMagnitude, jitterMagnitude)
|
||||
);
|
||||
constructor(private target: string, private readonly credentials: ChannelCredentials, private readonly options: ChannelOptions) {
|
||||
// TODO: check channel arg for getting a private pool
|
||||
this.subchannelPool = getSubchannelPool(true);
|
||||
const channelControlHelper: ChannelControlHelper = {
|
||||
createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => {
|
||||
return this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials);
|
||||
},
|
||||
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
||||
this.currentPicker = picker;
|
||||
const queueCopy = this.pickQueue.slice();
|
||||
this.pickQueue = [];
|
||||
for (const {callStream, callMetadata} of queueCopy) {
|
||||
this.tryPick(callStream, callMetadata);
|
||||
}
|
||||
this.startConnecting();
|
||||
break;
|
||||
case ConnectivityState.READY:
|
||||
this.emit('connect');
|
||||
break;
|
||||
case ConnectivityState.TRANSIENT_FAILURE:
|
||||
this.subChannel = null;
|
||||
this.backoffTimerId = setTimeout(() => {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.TRANSIENT_FAILURE],
|
||||
ConnectivityState.CONNECTING
|
||||
);
|
||||
}, this.currentBackoffDeadline.getTime() - now.getTime());
|
||||
break;
|
||||
case ConnectivityState.IDLE:
|
||||
case ConnectivityState.SHUTDOWN:
|
||||
if (this.subChannel) {
|
||||
this.subChannel.close();
|
||||
this.subChannel.removeListener(
|
||||
'connect',
|
||||
this.subChannelConnectCallback
|
||||
);
|
||||
this.subChannel.removeListener('close', this.subChannelCloseCallback);
|
||||
this.subChannel = null;
|
||||
this.emit('shutdown');
|
||||
clearTimeout(this.backoffTimerId);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new Error('This should never happen');
|
||||
}
|
||||
}
|
||||
|
||||
// Transition from any of a set of oldStates to a specific newState
|
||||
private transitionToState(
|
||||
oldStates: ConnectivityState[],
|
||||
newState: ConnectivityState
|
||||
): void {
|
||||
if (oldStates.indexOf(this.connectivityState) > -1) {
|
||||
const oldState: ConnectivityState = this.connectivityState;
|
||||
this.connectivityState = newState;
|
||||
this.handleStateChange(oldState, newState);
|
||||
this.emit('connectivityStateChanged', newState);
|
||||
}
|
||||
}
|
||||
|
||||
private startConnecting(): void {
|
||||
const connectionOptions: http2.SecureClientSessionOptions =
|
||||
this.credentials._getConnectionOptions() || {};
|
||||
if (connectionOptions.secureContext !== null) {
|
||||
// If provided, the value of grpc.ssl_target_name_override should be used
|
||||
// to override the target hostname when checking server identity.
|
||||
// This option is used for testing only.
|
||||
if (this.options['grpc.ssl_target_name_override']) {
|
||||
const sslTargetNameOverride = this.options[
|
||||
'grpc.ssl_target_name_override'
|
||||
]!;
|
||||
connectionOptions.checkServerIdentity = (
|
||||
host: string,
|
||||
cert: PeerCertificate
|
||||
): Error | undefined => {
|
||||
return checkServerIdentity(sslTargetNameOverride, cert);
|
||||
};
|
||||
connectionOptions.servername = sslTargetNameOverride;
|
||||
this.updateState(connectivityState);
|
||||
},
|
||||
requestReresolution: () => {
|
||||
// This should never be called.
|
||||
throw new Error('Resolving load balancer should never call requestReresolution');
|
||||
}
|
||||
}
|
||||
const subChannel: Http2SubChannel = new Http2SubChannel(
|
||||
this.target,
|
||||
connectionOptions,
|
||||
this.userAgent,
|
||||
this.options
|
||||
);
|
||||
this.subChannel = subChannel;
|
||||
const now = new Date();
|
||||
const connectionTimeout: number = Math.max(
|
||||
this.currentBackoffDeadline.getTime() - now.getTime(),
|
||||
MIN_CONNECT_TIMEOUT_MS
|
||||
);
|
||||
const connectionTimerId: NodeJS.Timer = setTimeout(() => {
|
||||
// This should trigger the 'close' event, which will send us back to
|
||||
// TRANSIENT_FAILURE
|
||||
subChannel.close();
|
||||
}, connectionTimeout);
|
||||
this.subChannelConnectCallback = () => {
|
||||
// Connection succeeded
|
||||
clearTimeout(connectionTimerId);
|
||||
this.transitionToState(
|
||||
[ConnectivityState.CONNECTING],
|
||||
ConnectivityState.READY
|
||||
);
|
||||
};
|
||||
subChannel.once('connect', this.subChannelConnectCallback);
|
||||
this.subChannelCloseCallback = () => {
|
||||
// Connection failed
|
||||
clearTimeout(connectionTimerId);
|
||||
/* TODO(murgatroid99): verify that this works for
|
||||
* CONNECTING->TRANSITIVE_FAILURE see nodejs/node#16645 */
|
||||
this.transitionToState(
|
||||
[ConnectivityState.CONNECTING, ConnectivityState.READY],
|
||||
ConnectivityState.TRANSIENT_FAILURE
|
||||
);
|
||||
};
|
||||
subChannel.once('close', this.subChannelCloseCallback);
|
||||
}
|
||||
|
||||
constructor(
|
||||
address: string,
|
||||
readonly credentials: ChannelCredentials,
|
||||
private readonly options: Partial<ChannelOptions>
|
||||
) {
|
||||
super();
|
||||
for (const option in options) {
|
||||
if (options.hasOwnProperty(option)) {
|
||||
if (!recognizedOptions.hasOwnProperty(option)) {
|
||||
console.warn(
|
||||
`Unrecognized channel argument '${option}' will be ignored.`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (credentials._isSecure()) {
|
||||
this.target = new url.URL(`https://${address}`);
|
||||
} else {
|
||||
this.target = new url.URL(`http://${address}`);
|
||||
}
|
||||
// TODO(murgatroid99): Add more centralized handling of channel options
|
||||
if (this.options['grpc.default_authority']) {
|
||||
this.defaultAuthority = this.options['grpc.default_authority'] as string;
|
||||
} else {
|
||||
this.defaultAuthority = this.target.host;
|
||||
}
|
||||
// TODO: check channel arg for default service config
|
||||
this.resolvingLoadBalancer = new ResolvingLoadBalancer(target, channelControlHelper, null);
|
||||
this.filterStackFactory = new FilterStackFactory([
|
||||
new CallCredentialsFilterFactory(this),
|
||||
new DeadlineFilterFactory(this),
|
||||
new MetadataStatusFilterFactory(this),
|
||||
new CompressionFilterFactory(this),
|
||||
]);
|
||||
this.currentBackoffDeadline = new Date();
|
||||
/* The only purpose of these lines is to ensure that this.backoffTimerId has
|
||||
* a value of type NodeJS.Timer. */
|
||||
this.backoffTimerId = setTimeout(() => {}, 0);
|
||||
|
||||
// Build user-agent string.
|
||||
this.userAgent = [
|
||||
options['grpc.primary_user_agent'],
|
||||
`grpc-node-js/${clientVersion}`,
|
||||
options['grpc.secondary_user_agent'],
|
||||
]
|
||||
.filter(e => e)
|
||||
.join(' '); // remove falsey values first
|
||||
// TODO(murgatroid99): Add more centralized handling of channel options
|
||||
if (this.options['grpc.default_authority']) {
|
||||
this.defaultAuthority = this.options['grpc.default_authority'] as string;
|
||||
} else {
|
||||
this.defaultAuthority = getDefaultAuthority(target);
|
||||
}
|
||||
}
|
||||
|
||||
_startHttp2Stream(
|
||||
authority: string,
|
||||
methodName: string,
|
||||
stream: Http2CallStream,
|
||||
metadata: Metadata
|
||||
) {
|
||||
const connectMetadata: Promise<Metadata> = this.connect().then(() =>
|
||||
metadata.clone()
|
||||
);
|
||||
const finalMetadata: Promise<Metadata> = stream.filterStack.sendMetadata(
|
||||
connectMetadata
|
||||
);
|
||||
finalMetadata
|
||||
.then(metadataValue => {
|
||||
const headers = metadataValue.toHttp2Headers();
|
||||
headers[HTTP2_HEADER_AUTHORITY] = authority;
|
||||
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
|
||||
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
|
||||
headers[HTTP2_HEADER_METHOD] = 'POST';
|
||||
headers[HTTP2_HEADER_PATH] = methodName;
|
||||
headers[HTTP2_HEADER_TE] = 'trailers';
|
||||
if (this.connectivityState === ConnectivityState.READY) {
|
||||
const subChannel: Http2SubChannel = this.subChannel!;
|
||||
subChannel.startCallStream(metadataValue, stream);
|
||||
/**
|
||||
* Check the picker output for the given call and corresponding metadata,
|
||||
* and take any relevant actions. Should not be called while iterating
|
||||
* over pickQueue.
|
||||
* @param callStream
|
||||
* @param callMetadata
|
||||
*/
|
||||
private tryPick(callStream: Http2CallStream, callMetadata: Metadata) {
|
||||
const pickResult = this.currentPicker.pick({metadata: callMetadata});
|
||||
switch(pickResult.pickResultType) {
|
||||
case PickResultType.COMPLETE:
|
||||
if (pickResult.subchannel === null) {
|
||||
// End the call with an error
|
||||
} else {
|
||||
/* In this case, we lost the connection while finalizing
|
||||
* metadata. That should be very unusual */
|
||||
setImmediate(() => {
|
||||
this._startHttp2Stream(authority, methodName, stream, metadata);
|
||||
/* If the subchannel disconnects between calling pick and getting
|
||||
* the filter stack metadata, the call will end with an error. */
|
||||
callStream.filterStack.sendMetadata(Promise.resolve(new Metadata())).then((finalMetadata) => {
|
||||
if (pickResult.subchannel!.getConnectivityState() === SubchannelConnectivityState.READY) {
|
||||
pickResult.subchannel!.startCallStream(callMetadata, callStream);
|
||||
} else {
|
||||
callStream.cancelWithStatus(Status.UNAVAILABLE, 'Connection dropped while starting call');
|
||||
}
|
||||
},
|
||||
(error: Error & { code: number }) => {
|
||||
// We assume the error code isn't 0 (Status.OK)
|
||||
callStream.cancelWithStatus(
|
||||
error.code || Status.UNKNOWN,
|
||||
`Getting metadata from plugin failed with error: ${error.message}`
|
||||
);
|
||||
});
|
||||
}
|
||||
})
|
||||
.catch((error: Error & { code: number }) => {
|
||||
// We assume the error code isn't 0 (Status.OK)
|
||||
stream.cancelWithStatus(
|
||||
error.code || Status.UNKNOWN,
|
||||
`Getting metadata from plugin failed with error: ${error.message}`
|
||||
);
|
||||
});
|
||||
break;
|
||||
case PickResultType.QUEUE:
|
||||
this.pickQueue.push({callStream, callMetadata});
|
||||
break;
|
||||
case PickResultType.TRANSIENT_FAILURE:
|
||||
if (callMetadata.getOptions().waitForReady) {
|
||||
this.pickQueue.push({callStream, callMetadata});
|
||||
} else {
|
||||
callStream.cancelWithStatus(pickResult.status!.code, pickResult.status!.details);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private removeConnectivityStateWatcher(watcherObject: ConnectivityStateWatcher) {
|
||||
const watcherIndex = this.connectivityStateWatchers.findIndex((value) => value === watcherObject);
|
||||
if (watcherIndex >= 0) {
|
||||
this.connectivityStateWatchers.splice(watcherIndex, 1);
|
||||
}
|
||||
}
|
||||
|
||||
private updateState(newState: ConnectivityState): void {
|
||||
this.connectivityState = newState;
|
||||
const watchersCopy = this.connectivityStateWatchers.slice();
|
||||
for (const watcherObject of watchersCopy) {
|
||||
if (newState !== watcherObject.currentState) {
|
||||
watcherObject.callback();
|
||||
clearTimeout(watcherObject.timer);
|
||||
this.removeConnectivityStateWatcher(watcherObject);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_startCallStream(stream: Http2CallStream, metadata: Metadata) {
|
||||
this.tryPick(stream, metadata);
|
||||
}
|
||||
|
||||
close() {
|
||||
this.resolvingLoadBalancer.destroy();
|
||||
this.updateState(ConnectivityState.SHUTDOWN);
|
||||
}
|
||||
|
||||
getTarget() {
|
||||
return this.target;
|
||||
}
|
||||
|
||||
getConnectivityState() {
|
||||
return this.connectivityState;
|
||||
}
|
||||
|
||||
watchConnectivityState(
|
||||
currentState: ConnectivityState,
|
||||
deadline: Date | number,
|
||||
callback: (error?: Error) => void
|
||||
): void {
|
||||
const deadlineDate: Date = deadline instanceof Date ? deadline : new Date(deadline);
|
||||
const now = new Date();
|
||||
if (deadlineDate <= now) {
|
||||
process.nextTick(callback, new Error('Deadline passed without connectivity state change'));
|
||||
return;
|
||||
}
|
||||
const watcherObject = {
|
||||
currentState,
|
||||
callback,
|
||||
timer: setTimeout(() => {
|
||||
this.removeConnectivityStateWatcher(watcherObject);
|
||||
callback(new Error('Deadline passed without connectivity state change'));
|
||||
}, deadlineDate.getTime() - now.getTime())
|
||||
};
|
||||
this.connectivityStateWatchers.push(watcherObject);
|
||||
}
|
||||
|
||||
createCall(
|
||||
|
|
@ -385,106 +275,9 @@ export class Http2Channel extends EventEmitter implements Channel {
|
|||
method,
|
||||
this,
|
||||
finalOptions,
|
||||
this.filterStackFactory
|
||||
this.filterStackFactory,
|
||||
this.credentials._getCallCredentials()
|
||||
);
|
||||
return stream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to connect, returning a Promise that resolves when the connection
|
||||
* is successful, or rejects if the channel is shut down.
|
||||
*/
|
||||
private connect(): Promise<void> {
|
||||
if (this.connectivityState === ConnectivityState.READY) {
|
||||
return Promise.resolve();
|
||||
} else if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
||||
return Promise.reject(new Error('Channel has been shut down'));
|
||||
} else {
|
||||
// In effect, this.connecting is only assigned upon the first attempt to
|
||||
// transition from IDLE to CONNECTING, so this condition could have also
|
||||
// been (connectivityState === IDLE).
|
||||
if (!this.connecting) {
|
||||
this.connecting = new Promise((resolve, reject) => {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.IDLE],
|
||||
ConnectivityState.CONNECTING
|
||||
);
|
||||
const onConnect = () => {
|
||||
this.connecting = null;
|
||||
this.removeListener('shutdown', onShutdown);
|
||||
resolve();
|
||||
};
|
||||
const onShutdown = () => {
|
||||
this.connecting = null;
|
||||
this.removeListener('connect', onConnect);
|
||||
reject(new Error('Channel has been shut down'));
|
||||
};
|
||||
this.once('connect', onConnect);
|
||||
this.once('shutdown', onShutdown);
|
||||
});
|
||||
}
|
||||
return this.connecting;
|
||||
}
|
||||
}
|
||||
|
||||
getConnectivityState(tryToConnect: boolean): ConnectivityState {
|
||||
if (tryToConnect) {
|
||||
this.transitionToState(
|
||||
[ConnectivityState.IDLE],
|
||||
ConnectivityState.CONNECTING
|
||||
);
|
||||
}
|
||||
return this.connectivityState;
|
||||
}
|
||||
|
||||
watchConnectivityState(
|
||||
currentState: ConnectivityState,
|
||||
deadline: Date | number,
|
||||
callback: (error?: Error) => void
|
||||
) {
|
||||
if (this.connectivityState !== currentState) {
|
||||
/* If the connectivity state is different from the provided currentState,
|
||||
* we assume that a state change has successfully occurred */
|
||||
setImmediate(callback);
|
||||
} else {
|
||||
let deadlineMs = 0;
|
||||
if (deadline instanceof Date) {
|
||||
deadlineMs = deadline.getTime();
|
||||
} else {
|
||||
deadlineMs = deadline;
|
||||
}
|
||||
let timeout: number = deadlineMs - Date.now();
|
||||
if (timeout < 0) {
|
||||
timeout = 0;
|
||||
}
|
||||
const timeoutId = setTimeout(() => {
|
||||
this.removeListener('connectivityStateChanged', eventCb);
|
||||
callback(new Error('Channel state did not change before deadline'));
|
||||
}, timeout);
|
||||
const eventCb = () => {
|
||||
clearTimeout(timeoutId);
|
||||
callback();
|
||||
};
|
||||
this.once('connectivityStateChanged', eventCb);
|
||||
}
|
||||
}
|
||||
|
||||
getTarget() {
|
||||
return this.target.toString();
|
||||
}
|
||||
|
||||
close() {
|
||||
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
||||
throw new Error('Channel has been shut down');
|
||||
}
|
||||
this.transitionToState(
|
||||
[
|
||||
ConnectivityState.CONNECTING,
|
||||
ConnectivityState.READY,
|
||||
ConnectivityState.TRANSIENT_FAILURE,
|
||||
ConnectivityState.IDLE,
|
||||
],
|
||||
ConnectivityState.SHUTDOWN
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
|
||||
import { Call } from './call-stream';
|
||||
import { ConnectivityState, Http2Channel } from './channel';
|
||||
import { ConnectivityState, Channel } from './channel';
|
||||
import { Status } from './constants';
|
||||
import { BaseFilter, Filter, FilterFactory } from './filter';
|
||||
import { Metadata } from './metadata';
|
||||
|
|
@ -44,7 +44,7 @@ export class DeadlineFilter extends BaseFilter implements Filter {
|
|||
private timer: NodeJS.Timer | null = null;
|
||||
private deadline: number;
|
||||
constructor(
|
||||
private readonly channel: Http2Channel,
|
||||
private readonly channel: Channel,
|
||||
private readonly callStream: Call
|
||||
) {
|
||||
super();
|
||||
|
|
@ -85,7 +85,7 @@ export class DeadlineFilter extends BaseFilter implements Filter {
|
|||
}
|
||||
|
||||
export class DeadlineFilterFactory implements FilterFactory<DeadlineFilter> {
|
||||
constructor(private readonly channel: Http2Channel) {}
|
||||
constructor(private readonly channel: Channel) {}
|
||||
|
||||
createFilter(callStream: Call): DeadlineFilter {
|
||||
return new DeadlineFilter(this.channel, callStream);
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
updateAddressList(addressList: string[], lbConfig?: LoadBalancingConfig): void {
|
||||
updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null): void {
|
||||
// lbConfig has no useful information for pick first load balancing
|
||||
this.latestAddressList = addressList;
|
||||
this.connectToAddressList();
|
||||
|
|
@ -191,6 +191,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
|
|||
getTypeName(): string {
|
||||
return TYPE_NAME;
|
||||
}
|
||||
|
||||
replaceChannelControlHelper(channelControlHelper: ChannelControlHelper) {
|
||||
this.channelControlHelper = channelControlHelper;
|
||||
}
|
||||
}
|
||||
|
||||
export function setup(): void {
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ export interface ChannelControlHelper {
|
|||
* @param subchannelAddress The address to connect to
|
||||
* @param subchannelArgs Extra channel arguments specified by the load balancer
|
||||
*/
|
||||
createSubchannel(subchannelAddress: String, subchannelArgs: ChannelOptions): Subchannel;
|
||||
createSubchannel(subchannelAddress: string, subchannelArgs: ChannelOptions): Subchannel;
|
||||
/**
|
||||
* Passes a new subchannel picker up to the channel. This is called if either
|
||||
* the connectivity state changes or if a different picker is needed for any
|
||||
|
|
@ -60,7 +60,7 @@ export interface LoadBalancer {
|
|||
* @param lbConfig The load balancing config object from the service config,
|
||||
* if one was provided
|
||||
*/
|
||||
updateAddressList(addressList: string[], lbConfig?: LoadBalancingConfig): void;
|
||||
updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null): void;
|
||||
/**
|
||||
* If the load balancer is currently in the IDLE state, start connecting.
|
||||
*/
|
||||
|
|
@ -82,6 +82,11 @@ export interface LoadBalancer {
|
|||
* balancer implementation class was registered with.
|
||||
*/
|
||||
getTypeName(): string;
|
||||
/**
|
||||
* Replace the existing ChannelControlHelper with a new one
|
||||
* @param channelControlHelper The new ChannelControlHelper to use from now on
|
||||
*/
|
||||
replaceChannelControlHelper(channelControlHelper: ChannelControlHelper): void;
|
||||
}
|
||||
|
||||
export interface LoadBalancerConstructor {
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ function validate(key: string, value?: MetadataValue): void {
|
|||
}
|
||||
}
|
||||
|
||||
interface MetadataOptions {
|
||||
export interface MetadataOptions {
|
||||
/* Signal that the request is idempotent. Defaults to false */
|
||||
idempotentRequest?: boolean;
|
||||
/* Signal that the call should not return UNAVAILABLE before it has
|
||||
|
|
@ -80,8 +80,15 @@ interface MetadataOptions {
|
|||
*/
|
||||
export class Metadata {
|
||||
protected internalRepr: MetadataObject = new Map<string, MetadataValue[]>();
|
||||
private options: MetadataOptions;
|
||||
|
||||
constructor(private options?: MetadataOptions) {}
|
||||
constructor(options?: MetadataOptions) {
|
||||
if (options === undefined) {
|
||||
this.options = {};
|
||||
} else {
|
||||
this.options = options;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the given value for the given key by replacing any other values
|
||||
|
|
@ -200,6 +207,10 @@ export class Metadata {
|
|||
this.options = options;
|
||||
}
|
||||
|
||||
getOptions(): MetadataOptions {
|
||||
return this.options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an OutgoingHttpHeaders object that can be used with the http2 API.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import * as util from 'util';
|
|||
import { extractAndSelectServiceConfig, ServiceConfig } from './service-config';
|
||||
import { ServiceError } from './call';
|
||||
import { Status } from './constants';
|
||||
import { URL } from 'url';
|
||||
|
||||
/* These regular expressions match IP addresses with optional ports in different
|
||||
* formats. In each case, capture group 1 contains the address, and capture
|
||||
|
|
@ -87,7 +88,6 @@ class DnsResolver implements Resolver {
|
|||
}
|
||||
}
|
||||
this.percentage = Math.random() * 100;
|
||||
this.startResolution();
|
||||
}
|
||||
|
||||
private startResolution() {
|
||||
|
|
@ -140,6 +140,10 @@ class DnsResolver implements Resolver {
|
|||
this.startResolution();
|
||||
}
|
||||
}
|
||||
|
||||
static getDefaultAuthority(target: string): string {
|
||||
return new URL(target).hostname;
|
||||
}
|
||||
}
|
||||
|
||||
export function setup(): void {
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ export interface Resolver {
|
|||
|
||||
export interface ResolverConstructor {
|
||||
new(target: string, listener: ResolverListener): Resolver;
|
||||
getDefaultAuthority(target:string): string;
|
||||
}
|
||||
|
||||
const registeredResolvers: {[prefix: string]: ResolverConstructor} = {};
|
||||
|
|
@ -52,4 +53,13 @@ export function createResolver(target: string, listener: ResolverListener): Reso
|
|||
return new defaultResolver(target, listener);
|
||||
}
|
||||
throw new Error('No resolver could be created for the provided target');
|
||||
}
|
||||
|
||||
export function getDefaultAuthority(target: string): string {
|
||||
for (const prefix of Object.keys(registerDefaultResolver)) {
|
||||
if (target.startsWith(prefix)) {
|
||||
return registeredResolvers[prefix].getDefaultAuthority(target);
|
||||
}
|
||||
}
|
||||
throw new Error(`Invalid target "${target}"`);
|
||||
}
|
||||
|
|
@ -15,16 +15,23 @@
|
|||
*
|
||||
*/
|
||||
|
||||
import { ChannelControlHelper, LoadBalancer, isLoadBalancerNameRegistered } from "./load-balancer";
|
||||
import { ChannelControlHelper, LoadBalancer, isLoadBalancerNameRegistered, createLoadBalancer } from "./load-balancer";
|
||||
import { ServiceConfig } from "./service-config";
|
||||
import { ConnectivityState } from "./channel";
|
||||
import { createResolver, Resolver } from "./resolver";
|
||||
import { ServiceError } from "./call";
|
||||
import { ChannelOptions } from "./channel-options";
|
||||
import { Picker, UnavailablePicker, QueuePicker } from "./picker";
|
||||
import { LoadBalancingConfig } from "./load-balancing-config";
|
||||
|
||||
const DEFAULT_LOAD_BALANCER_NAME = 'pick_first';
|
||||
|
||||
export class ResolvingLoadBalancer {
|
||||
export class ResolvingLoadBalancer implements LoadBalancer {
|
||||
private innerResolver: Resolver;
|
||||
/**
|
||||
* Current internal load balancer used for handling calls.
|
||||
* Invariant: innerLoadBalancer === null => pendingReplacementLoadBalancer === null.
|
||||
*/
|
||||
private innerLoadBalancer: LoadBalancer | null = null;
|
||||
private pendingReplacementLoadBalancer: LoadBalancer | null = null;
|
||||
private currentState: ConnectivityState = ConnectivityState.IDLE;
|
||||
|
|
@ -36,8 +43,32 @@ export class ResolvingLoadBalancer {
|
|||
*/
|
||||
private previousServiceConfig: ServiceConfig | null | undefined = undefined;
|
||||
|
||||
constructor (private target: string, private channelControlHelper: ChannelControlHelper, private defaultServiceConfig: ServiceConfig | null) {
|
||||
private innerBalancerState: ConnectivityState = ConnectivityState.IDLE;
|
||||
|
||||
/**
|
||||
* The most recent reported state of the pendingReplacementLoadBalancer.
|
||||
* Starts at IDLE for type simplicity. This should get updated as soon as the
|
||||
* pendingReplacementLoadBalancer gets constructed.
|
||||
*/
|
||||
private replacementBalancerState: ConnectivityState = ConnectivityState.IDLE;
|
||||
/**
|
||||
* The picker associated with the replacementBalancerState. Starts as an
|
||||
* UnavailablePicker for type simplicity. This should get updated as soon as
|
||||
* the pendingReplacementLoadBalancer gets constructed.
|
||||
*/
|
||||
private replacementBalancerPicker: Picker = new UnavailablePicker();
|
||||
|
||||
/**
|
||||
* ChannelControlHelper for the innerLoadBalancer.
|
||||
*/
|
||||
private readonly innerChannelControlHelper: ChannelControlHelper;
|
||||
/**
|
||||
* ChannelControlHelper for the pendingReplacementLoadBalancer.
|
||||
*/
|
||||
private readonly replacementChannelControlHelper: ChannelControlHelper;
|
||||
|
||||
constructor (private target: string, private channelControlHelper: ChannelControlHelper, private defaultServiceConfig: ServiceConfig | null) {
|
||||
this.channelControlHelper.updateState(ConnectivityState.IDLE, new QueuePicker(this));
|
||||
this.innerResolver = createResolver(target, {
|
||||
onSuccessfulResolution: (addressList: string[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null) => {
|
||||
let workingServiceConfig: ServiceConfig | null = null;
|
||||
|
|
@ -57,14 +88,17 @@ export class ResolvingLoadBalancer {
|
|||
this.previousServiceConfig = serviceConfig;
|
||||
}
|
||||
let loadBalancerName: string | null = null;
|
||||
let loadBalancingConfig: LoadBalancingConfig | null = null;
|
||||
if (workingServiceConfig === null || workingServiceConfig.loadBalancingConfig.length === 0) {
|
||||
loadBalancerName = DEFAULT_LOAD_BALANCER_NAME;
|
||||
} else {
|
||||
for (const lbConfig of workingServiceConfig.loadBalancingConfig) {
|
||||
// Iterating through a oneof looking for whichever one is populated
|
||||
for (const key in lbConfig) {
|
||||
if (Object.prototype.hasOwnProperty.call(lbConfig, key)) {
|
||||
if (isLoadBalancerNameRegistered(key)) {
|
||||
loadBalancerName = key;
|
||||
loadBalancingConfig = lbConfig;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -79,14 +113,112 @@ export class ResolvingLoadBalancer {
|
|||
return;
|
||||
}
|
||||
}
|
||||
if (this.innerLoadBalancer === null) {
|
||||
this.innerLoadBalancer = createLoadBalancer(loadBalancerName, this.innerChannelControlHelper)!;
|
||||
this.innerLoadBalancer.updateAddressList(addressList, loadBalancingConfig);
|
||||
} else if (this.innerLoadBalancer.getTypeName() === loadBalancerName) {
|
||||
this.innerLoadBalancer.updateAddressList(addressList, loadBalancingConfig);
|
||||
} else {
|
||||
if (this.pendingReplacementLoadBalancer === null || this.pendingReplacementLoadBalancer.getTypeName() !== loadBalancerName) {
|
||||
if (this.pendingReplacementLoadBalancer !== null) {
|
||||
this.pendingReplacementLoadBalancer.destroy();
|
||||
}
|
||||
this.pendingReplacementLoadBalancer = createLoadBalancer(loadBalancerName, this.replacementChannelControlHelper)!;
|
||||
}
|
||||
this.pendingReplacementLoadBalancer.updateAddressList(addressList, loadBalancingConfig);
|
||||
}
|
||||
},
|
||||
onError: (error: ServiceError) => {
|
||||
this.handleResolutionFailure(error);
|
||||
}
|
||||
});
|
||||
|
||||
this.innerChannelControlHelper = {
|
||||
createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => {
|
||||
return this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
|
||||
},
|
||||
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
||||
this.innerBalancerState = connectivityState;
|
||||
if (connectivityState === ConnectivityState.TRANSIENT_FAILURE && this.pendingReplacementLoadBalancer !== null) {
|
||||
this.switchOverReplacementBalancer();
|
||||
} else {
|
||||
this.channelControlHelper.updateState(connectivityState, picker);
|
||||
}
|
||||
},
|
||||
requestReresolution: () => {
|
||||
if (this.pendingReplacementLoadBalancer === null) {
|
||||
this.innerResolver.updateResolution();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.replacementChannelControlHelper = {
|
||||
createSubchannel: (subchannelAddress: string, subchannelArgs: ChannelOptions) => {
|
||||
return this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
|
||||
},
|
||||
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
||||
this.replacementBalancerState = connectivityState;
|
||||
this.replacementBalancerPicker = picker;
|
||||
if (connectivityState === ConnectivityState.READY) {
|
||||
this.switchOverReplacementBalancer();
|
||||
}
|
||||
},
|
||||
requestReresolution: () => {
|
||||
this.innerResolver.updateResolution();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop using the current innerLoadBalancer and replace it with the
|
||||
* pendingReplacementLoadBalancer. Must only be called if both of
|
||||
* those are currently not null.
|
||||
*/
|
||||
private switchOverReplacementBalancer() {
|
||||
this.innerLoadBalancer!.destroy();
|
||||
this.innerLoadBalancer = this.pendingReplacementLoadBalancer!;
|
||||
this.innerLoadBalancer.replaceChannelControlHelper(this.innerChannelControlHelper);
|
||||
this.pendingReplacementLoadBalancer = null;
|
||||
this.innerBalancerState = this.replacementBalancerState;
|
||||
this.channelControlHelper.updateState(this.replacementBalancerState, this.replacementBalancerPicker);
|
||||
}
|
||||
|
||||
private handleResolutionFailure(error: ServiceError) {
|
||||
|
||||
}
|
||||
|
||||
exitIdle() {
|
||||
this.innerResolver.updateResolution();
|
||||
if (this.innerLoadBalancer !== null) {
|
||||
this.innerLoadBalancer.exitIdle();
|
||||
}
|
||||
}
|
||||
|
||||
updateAddressList(addressList: string[], lbConfig: LoadBalancingConfig | null) {
|
||||
throw new Error('updateAddressList not supported on ResolvingLoadBalancer');
|
||||
}
|
||||
|
||||
resetBackoff() {
|
||||
// TODO
|
||||
}
|
||||
|
||||
destroy() {
|
||||
if (this.innerLoadBalancer !== null) {
|
||||
this.innerLoadBalancer.destroy();
|
||||
this.innerLoadBalancer = null;
|
||||
}
|
||||
if (this.pendingReplacementLoadBalancer !== null) {
|
||||
this.pendingReplacementLoadBalancer.destroy();
|
||||
this.pendingReplacementLoadBalancer = null;
|
||||
}
|
||||
// Go to another state?
|
||||
}
|
||||
|
||||
getTypeName() {
|
||||
return 'resolving_load_balancer';
|
||||
}
|
||||
|
||||
replaceChannelControlHelper(channelControlHelper: ChannelControlHelper) {
|
||||
this.channelControlHelper = channelControlHelper;
|
||||
}
|
||||
}
|
||||
|
|
@ -73,6 +73,10 @@ export class SubchannelPool {
|
|||
|
||||
const globalSubchannelPool = new SubchannelPool(true);
|
||||
|
||||
export function getOrCreateSubchannel(channelTarget: string, subchannelTarget: string, channelArguments: ChannelOptions, channelCredentials: ChannelCredentials): Subchannel {
|
||||
return globalSubchannelPool.getOrCreateSubchannel(channelTarget, subchannelTarget, channelArguments, channelCredentials);
|
||||
export function getSubchannelPool(global: boolean): SubchannelPool {
|
||||
if (global) {
|
||||
return globalSubchannelPool;
|
||||
} else {
|
||||
return new SubchannelPool(false);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue