Create separate subchannel class, fix default keepalive time value

This commit is contained in:
murgatroid99 2018-08-03 13:43:23 -07:00
parent 8bbd4752d9
commit 500642b5ac
2 changed files with 143 additions and 94 deletions

View File

@ -13,6 +13,7 @@ import {DeadlineFilterFactory} from './deadline-filter';
import {FilterStackFactory} from './filter-stack';
import {Metadata, MetadataObject} from './metadata';
import {MetadataStatusFilterFactory} from './metadata-status-filter';
import { Http2SubChannel } from './subchannel';
const {version: clientVersion} = require('../../package');
@ -24,9 +25,6 @@ const BACKOFF_MULTIPLIER = 1.6;
const MAX_BACKOFF_MS = 120000;
const BACKOFF_JITTER = 0.2;
const KEEPALIVE_TIME_MS = 1 << 31;
const KEEPALIVE_TIMEOUT_MS = 20000;
const {
HTTP2_HEADER_AUTHORITY,
HTTP2_HEADER_CONTENT_TYPE,
@ -45,6 +43,8 @@ export interface ChannelOptions {
'grpc.primary_user_agent': string;
'grpc.secondary_user_agent': string;
'grpc.default_authority': string;
'grpc.keealive_time_ms': number;
'grpc.keepalive_timeout_ms': number;
[key: string]: string|number;
}
@ -85,15 +85,6 @@ export interface Channel extends EventEmitter {
/* tslint:enable:no-any */
}
/* This should be a real subchannel class that contains a ClientHttp2Session,
* but for now this serves its purpose */
type Http2SubChannel = http2.ClientHttp2Session&{
/* Count the number of currently active streams associated with the session.
* The purpose of this is to keep the session reffed if and only if there
* is at least one active stream */
streamCount?: number;
};
export class Http2Channel extends EventEmitter implements Channel {
private readonly userAgent: string;
private readonly target: url.URL;
@ -113,12 +104,6 @@ export class Http2Channel extends EventEmitter implements Channel {
private currentBackoff: number = INITIAL_BACKOFF_MS;
private currentBackoffDeadline: Date;
private keepaliveTimeMs: number = KEEPALIVE_TIME_MS;
private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS;
private keepaliveIntervalId: NodeJS.Timer;
private keepaliveTimeoutId: NodeJS.Timer;
private handleStateChange(
oldState: ConnectivityState, newState: ConnectivityState): void {
const now: Date = new Date();
@ -143,8 +128,6 @@ export class Http2Channel extends EventEmitter implements Channel {
break;
case ConnectivityState.TRANSIENT_FAILURE:
this.subChannel = null;
/* Stop keepalive pings when the subchannel disconnects */
this.stopKeepalivePings();
this.backoffTimerId = setTimeout(() => {
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE],
@ -161,7 +144,6 @@ export class Http2Channel extends EventEmitter implements Channel {
this.subChannel = null;
this.emit('shutdown');
clearTimeout(this.backoffTimerId);
this.stopKeepalivePings();
}
break;
default:
@ -181,14 +163,10 @@ export class Http2Channel extends EventEmitter implements Channel {
}
private startConnecting(): void {
let subChannel: Http2SubChannel;
const secureContext = this.credentials.getSecureContext();
if (secureContext === null) {
subChannel = http2.connect(this.target);
} else {
const connectionOptions: http2.SecureClientSessionOptions = {
secureContext,
};
let connectionOptions: http2.SecureClientSessionOptions = {};
if (secureContext !== null) {
connectionOptions.secureContext = secureContext;
// If provided, the value of grpc.ssl_target_name_override should be used
// to override the target hostname when checking server identity.
// This option is used for testing only.
@ -201,8 +179,8 @@ export class Http2Channel extends EventEmitter implements Channel {
};
connectionOptions.servername = sslTargetNameOverride;
}
subChannel = http2.connect(this.target, connectionOptions);
}
const subChannel: Http2SubChannel = new Http2SubChannel(this.target, connectionOptions, this.userAgent, this.options);
this.subChannel = subChannel;
const now = new Date();
const connectionTimeout: number = Math.max(
@ -230,33 +208,6 @@ export class Http2Channel extends EventEmitter implements Channel {
ConnectivityState.TRANSIENT_FAILURE);
};
subChannel.once('close', this.subChannelCloseCallback);
subChannel.once('error', this.subChannelCloseCallback);
}
private sendPing() {
this.keepaliveTimeoutId = setTimeout(() => {
this.transitionToState([ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE)
}, this.keepaliveTimeoutMs)
this.subChannel!.ping((err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(this.keepaliveTimeoutId);
});
}
/* TODO(murgatroid99): refactor subchannels so that keepalives can be handled
* per subchannel */
private startKeepalivePings() {
this.keepaliveIntervalId = setInterval(() => {
if (this.subChannel) {
this.sendPing();
}
}, this.keepaliveTimeMs);
this.sendPing();
}
private stopKeepalivePings() {
clearInterval(this.keepaliveIntervalId);
clearTimeout(this.keepaliveTimeoutId);
}
constructor(
@ -278,21 +229,10 @@ export class Http2Channel extends EventEmitter implements Channel {
new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this),
new MetadataStatusFilterFactory(this), new CompressionFilterFactory(this)
]);
if (this.options['grpc.keepalive_time_ms']) {
this.keepaliveTimeMs = this.options['grpc.keepalive_time_ms'] as number;
}
if (this.options['grpc.keepalive_timeout_ms']) {
this.keepaliveTimeoutMs = this.options['grpc.keepalive_timeout_ms'] as number;
}
this.currentBackoffDeadline = new Date();
/* The only purpose of these lines is to ensure that this.backoffTimerId has
* a value of type NodeJS.Timer. */
this.backoffTimerId = setTimeout(() => {}, 0);
clearTimeout(this.backoffTimerId);
this.keepaliveIntervalId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveIntervalId);
this.keepaliveTimeoutId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveTimeoutId);
// Build user-agent string.
this.userAgent = [
@ -316,33 +256,8 @@ export class Http2Channel extends EventEmitter implements Channel {
headers[HTTP2_HEADER_PATH] = methodName;
headers[HTTP2_HEADER_TE] = 'trailers';
if (this.connectivityState === ConnectivityState.READY) {
const session: Http2SubChannel = this.subChannel!;
let http2Stream = session.request(headers);
/* This is a very ad-hoc reference counting scheme. This should be
* handled by a subchannel class */
session.ref();
if (!session.streamCount) {
session.streamCount = 0;
}
if (session.streamCount == 0) {
/* Start keepalive pings when we start a stream on an empty
* session */
this.startKeepalivePings();
}
session.streamCount += 1;
http2Stream.on('close', () => {
if (!session.streamCount) {
session.streamCount = 0;
}
session.streamCount -= 1;
if (session.streamCount <= 0) {
session.unref();
/* Stop keepalive pings when we end the last stream on a
* session */
this.stopKeepalivePings();
}
});
stream.attachHttp2Stream(http2Stream);
const subChannel: Http2SubChannel = this.subChannel!;
subChannel.startCallStream(metadataValue, stream);
} else {
/* In this case, we lost the connection while finalizing
* metadata. That should be very unusual */

View File

@ -0,0 +1,134 @@
import * as http2 from 'http2';
import * as url from 'url';
import { EventEmitter } from "events";
import { Metadata } from "./metadata";
import { CallStream, CallOptions, Http2CallStream } from "./call-stream";
import { EmitterAugmentation1, EmitterAugmentation0 } from "./events";
import { ChannelOptions } from './channel';
const {
HTTP2_HEADER_AUTHORITY,
HTTP2_HEADER_CONTENT_TYPE,
HTTP2_HEADER_METHOD,
HTTP2_HEADER_PATH,
HTTP2_HEADER_SCHEME,
HTTP2_HEADER_TE,
HTTP2_HEADER_USER_AGENT
} = http2.constants;
/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
* have a constant for the max signed 32 bit integer, so this is a simple way
* to calculate it */
const KEEPALIVE_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000;
export interface SubChannel extends EventEmitter {
/**
* Attach a call stream to this subchannel's connection to start it
* @param headers The headers to start the stream with
* @param callStream The stream to start
*/
startCallStream(metadata: Metadata, callStream: CallStream): void;
close(): void;
}
export class Http2SubChannel extends EventEmitter implements SubChannel {
private session: http2.ClientHttp2Session;
private refCount: number = 0;
private userAgent: string;
private keepaliveTimeMs: number = KEEPALIVE_TIME_MS;
private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS;
private keepaliveIntervalId: NodeJS.Timer;
private keepaliveTimeoutId: NodeJS.Timer;
constructor(target: url.URL, connectionOptions: http2.SecureClientSessionOptions,
userAgent: string, channelArgs: Partial<ChannelOptions>) {
super();
this.session = http2.connect(target, connectionOptions);
this.session.on('connect', () => {
this.emit('connect');
});
this.session.on('close', () => {
this.stopKeepalivePings();
this.emit('close');
});
this.session.on('error', () => {
this.stopKeepalivePings();
this.emit('close');
})
this.userAgent = userAgent;
if (channelArgs['grpc.keepalive_time_ms']) {
this.keepaliveTimeMs = channelArgs['grpc.keepalive_time_ms'] as number;
}
if (channelArgs['grpc.keepalive_timeout_ms']) {
this.keepaliveTimeoutMs = channelArgs['grpc.keepalive_timeout_ms'] as number;
}
this.keepaliveIntervalId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveIntervalId);
this.keepaliveTimeoutId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveTimeoutId);
}
private ref() {
if (this.refCount === 0) {
this.session.ref();
this.startKeepalivePings();
}
this.refCount += 1;
}
private unref() {
this.refCount -= 1;
if (this.refCount === 0) {
this.session.unref();
this.stopKeepalivePings();
}
}
private sendPing() {
this.keepaliveTimeoutId = setTimeout(() => {
this.emit('close');
}, this.keepaliveTimeoutMs);
this.session.ping((err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(this.keepaliveTimeoutId);
});
}
/* TODO(murgatroid99): refactor subchannels so that keepalives can be handled
* per subchannel */
private startKeepalivePings() {
this.keepaliveIntervalId = setInterval(() => {
this.sendPing();
}, this.keepaliveTimeMs);
this.sendPing();
}
private stopKeepalivePings() {
clearInterval(this.keepaliveIntervalId);
clearTimeout(this.keepaliveTimeoutId);
}
// Prerequisite: this subchannel is connected
startCallStream(metadata: Metadata, callStream: Http2CallStream) {
const headers = metadata.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = callStream.getMethod();
headers[HTTP2_HEADER_TE] = 'trailers';
let http2Stream = this.session.request(headers);
this.ref();
http2Stream.on('close', () => {
this.unref();
});
callStream.attachHttp2Stream(http2Stream);
}
close() {
this.session.close();
}
}