feat: channelz improvements, idle timeout implementation

This commit is contained in:
AVVS 2024-02-27 12:27:25 -08:00
parent 210967ffa3
commit e0b900dd69
No known key found for this signature in database
GPG Key ID: 2B890ABACCC3E369
13 changed files with 813 additions and 546 deletions

View File

@ -50,6 +50,7 @@
"@typescript-eslint/explicit-module-boundary-types": "off",
"@typescript-eslint/ban-types": "off",
"@typescript-eslint/camelcase": "off",
"@typescript-eslint/no-explicit-any": "off",
"node/no-missing-import": "off",
"node/no-empty-function": "off",
"node/no-unsupported-features/es-syntax": "off",

View File

@ -35,14 +35,17 @@ const pkgPath = path.resolve(jsCoreDir, 'package.json');
const supportedVersionRange = require(pkgPath).engines.node;
const versionNotSupported = () => {
console.log(`Skipping grpc-js task for Node ${process.version}`);
return () => { return Promise.resolve(); };
return () => {
return Promise.resolve();
};
};
const identity = (value: any): any => value;
const checkTask = semver.satisfies(process.version, supportedVersionRange) ?
identity : versionNotSupported;
const checkTask = semver.satisfies(process.version, supportedVersionRange)
? identity
: versionNotSupported;
const execNpmVerb = (verb: string, ...args: string[]) =>
execa('npm', [verb, ...args], {cwd: jsCoreDir, stdio: 'inherit'});
execa('npm', [verb, ...args], { cwd: jsCoreDir, stdio: 'inherit' });
const execNpmCommand = execNpmVerb.bind(null, 'run');
const install = checkTask(() => execNpmVerb('install', '--unsafe-perm'));
@ -64,22 +67,20 @@ const cleanAll = gulp.parallel(clean);
*/
const compile = checkTask(() => execNpmCommand('compile'));
const copyTestFixtures = checkTask(() => ncpP(`${jsCoreDir}/test/fixtures`, `${outDir}/test/fixtures`));
const copyTestFixtures = checkTask(() =>
ncpP(`${jsCoreDir}/test/fixtures`, `${outDir}/test/fixtures`)
);
const runTests = checkTask(() => {
process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION = 'true';
return gulp.src(`${outDir}/test/**/*.js`)
.pipe(mocha({reporter: 'mocha-jenkins-reporter',
require: ['ts-node/register']}));
return gulp.src(`${outDir}/test/**/*.js`).pipe(
mocha({
reporter: 'mocha-jenkins-reporter',
require: ['ts-node/register'],
})
);
});
const test = gulp.series(install, copyTestFixtures, runTests);
export {
install,
lint,
clean,
cleanAll,
compile,
test
}
export { install, lint, clean, cleanAll, compile, test };

View File

@ -6,7 +6,7 @@
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
"main": "build/src/index.js",
"engines": {
"node": "^8.13.0 || >=10.10.0"
"node": ">=12.10.0"
},
"keywords": [],
"author": {
@ -15,17 +15,18 @@
"types": "build/src/index.d.ts",
"license": "Apache-2.0",
"devDependencies": {
"@types/gulp": "^4.0.6",
"@types/gulp-mocha": "0.0.32",
"@types/lodash": "^4.14.186",
"@types/mocha": "^5.2.6",
"@types/ncp": "^2.0.1",
"@types/pify": "^3.0.2",
"@types/semver": "^7.3.9",
"@typescript-eslint/eslint-plugin": "^5.59.11",
"@typescript-eslint/parser": "^5.59.11",
"@typescript-eslint/typescript-estree": "^5.59.11",
"clang-format": "^1.0.55",
"@types/gulp": "^4.0.17",
"@types/gulp-mocha": "0.0.37",
"@types/lodash": "^4.14.202",
"@types/mocha": "^10.0.6",
"@types/ncp": "^2.0.8",
"@types/pify": "^5.0.4",
"@types/semver": "^7.5.8",
"@types/node": ">=20.11.20",
"@typescript-eslint/eslint-plugin": "^7.1.0",
"@typescript-eslint/parser": "^7.1.0",
"@typescript-eslint/typescript-estree": "^7.1.0",
"clang-format": "^1.8.0",
"eslint": "^8.42.0",
"eslint-config-prettier": "^8.8.0",
"eslint-plugin-node": "^11.1.0",
@ -33,16 +34,16 @@
"execa": "^2.0.3",
"gulp": "^4.0.2",
"gulp-mocha": "^6.0.0",
"lodash": "^4.17.4",
"lodash": "^4.17.21",
"madge": "^5.0.1",
"mocha-jenkins-reporter": "^0.4.1",
"ncp": "^2.0.0",
"pify": "^4.0.1",
"prettier": "^2.8.8",
"rimraf": "^3.0.2",
"semver": "^7.3.5",
"ts-node": "^10.9.1",
"typescript": "^5.1.3"
"semver": "^7.6.0",
"ts-node": "^10.9.2",
"typescript": "^5.3.3"
},
"contributors": [
{
@ -65,8 +66,8 @@
"generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto"
},
"dependencies": {
"@grpc/proto-loader": "^0.7.8",
"@types/node": ">=12.12.47"
"@grpc/proto-loader": "^0.7.10",
"@js-sdsl/ordered-map": "^4.4.2"
},
"files": [
"src/**/*.ts",

View File

@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options';
import { ServerSurfaceCall } from './server-call';
import { ConnectivityState } from './connectivity-state';
import { ChannelRef } from './channelz';
import type { ChannelRef } from './channelz';
import { Call } from './call-interface';
import { InternalChannel } from './internal-channel';
import { Deadline } from './deadline';

View File

@ -16,6 +16,7 @@
*/
import { isIPv4, isIPv6 } from 'net';
import { OrderedMap, type OrderedMapIterator } from '@js-sdsl/ordered-map';
import { ConnectivityState } from './connectivity-state';
import { Status } from './constants';
import { Timestamp } from './generated/google/protobuf/Timestamp';
@ -66,24 +67,25 @@ export type TraceSeverity =
| 'CT_ERROR';
export interface ChannelRef {
kind: 'channel';
kind: EntityTypes.channel;
id: number;
name: string;
}
export interface SubchannelRef {
kind: 'subchannel';
kind: EntityTypes.subchannel;
id: number;
name: string;
}
export interface ServerRef {
kind: 'server';
kind: EntityTypes.server;
id: number;
name: string;
}
export interface SocketRef {
kind: 'socket';
kind: EntityTypes.socket;
id: number;
name: string;
}
@ -131,6 +133,21 @@ interface TraceEvent {
*/
const TARGET_RETAINED_TRACES = 32;
export class ChannelzTraceStub {
readonly events: TraceEvent[] = [];
readonly creationTimestamp: Date = new Date();
readonly eventsLogged = 0;
addTrace(): void {}
getTraceMessage(): ChannelTrace {
return {
creation_timestamp: dateToProtoTimestamp(this.creationTimestamp),
num_events_logged: this.eventsLogged,
events: [],
};
}
}
export class ChannelzTrace {
events: TraceEvent[] = [];
creationTimestamp: Date;
@ -182,105 +199,64 @@ export class ChannelzTrace {
}
export class ChannelzChildrenTracker {
private channelChildren: Map<number, { ref: ChannelRef; count: number }> =
new Map<number, { ref: ChannelRef; count: number }>();
private subchannelChildren: Map<
private channelChildren = new OrderedMap<
number,
{ ref: ChannelRef; count: number }
>();
private subchannelChildren = new OrderedMap<
number,
{ ref: SubchannelRef; count: number }
> = new Map<number, { ref: SubchannelRef; count: number }>();
private socketChildren: Map<number, { ref: SocketRef; count: number }> =
new Map<number, { ref: SocketRef; count: number }>();
>();
private socketChildren = new OrderedMap<
number,
{ ref: SocketRef; count: number }
>();
private trackerMap = {
[EntityTypes.channel]: this.channelChildren,
[EntityTypes.subchannel]: this.subchannelChildren,
[EntityTypes.socket]: this.socketChildren,
} as const;
refChild(child: ChannelRef | SubchannelRef | SocketRef) {
switch (child.kind) {
case 'channel': {
const trackedChild = this.channelChildren.get(child.id) ?? {
ref: child,
count: 0,
};
trackedChild.count += 1;
this.channelChildren.set(child.id, trackedChild);
break;
}
case 'subchannel': {
const trackedChild = this.subchannelChildren.get(child.id) ?? {
ref: child,
count: 0,
};
trackedChild.count += 1;
this.subchannelChildren.set(child.id, trackedChild);
break;
}
case 'socket': {
const trackedChild = this.socketChildren.get(child.id) ?? {
ref: child,
count: 0,
};
trackedChild.count += 1;
this.socketChildren.set(child.id, trackedChild);
break;
}
const tracker = this.trackerMap[child.kind];
const trackedChild = tracker.getElementByKey(child.id);
if (trackedChild === undefined) {
tracker.setElement(child.id, {
// @ts-expect-error union issues
ref: child,
count: 1,
});
} else {
trackedChild.count += 1;
}
}
unrefChild(child: ChannelRef | SubchannelRef | SocketRef) {
switch (child.kind) {
case 'channel': {
const trackedChild = this.channelChildren.get(child.id);
if (trackedChild !== undefined) {
trackedChild.count -= 1;
if (trackedChild.count === 0) {
this.channelChildren.delete(child.id);
} else {
this.channelChildren.set(child.id, trackedChild);
}
}
break;
}
case 'subchannel': {
const trackedChild = this.subchannelChildren.get(child.id);
if (trackedChild !== undefined) {
trackedChild.count -= 1;
if (trackedChild.count === 0) {
this.subchannelChildren.delete(child.id);
} else {
this.subchannelChildren.set(child.id, trackedChild);
}
}
break;
}
case 'socket': {
const trackedChild = this.socketChildren.get(child.id);
if (trackedChild !== undefined) {
trackedChild.count -= 1;
if (trackedChild.count === 0) {
this.socketChildren.delete(child.id);
} else {
this.socketChildren.set(child.id, trackedChild);
}
}
break;
const tracker = this.trackerMap[child.kind];
const trackedChild = tracker.getElementByKey(child.id);
if (trackedChild !== undefined) {
trackedChild.count -= 1;
if (trackedChild.count === 0) {
tracker.eraseElementByKey(child.id);
}
}
}
getChildLists(): ChannelzChildren {
const channels: ChannelRef[] = [];
for (const { ref } of this.channelChildren.values()) {
channels.push(ref);
}
const subchannels: SubchannelRef[] = [];
for (const { ref } of this.subchannelChildren.values()) {
subchannels.push(ref);
}
const sockets: SocketRef[] = [];
for (const { ref } of this.socketChildren.values()) {
sockets.push(ref);
}
return { channels, subchannels, sockets };
return {
channels: this.channelChildren,
subchannels: this.subchannelChildren,
sockets: this.socketChildren,
};
}
}
export class ChannelzChildrenTrackerStub extends ChannelzChildrenTracker {
override refChild(): void {}
override unrefChild(): void {}
}
export class ChannelzCallTracker {
callsStarted = 0;
callsSucceeded = 0;
@ -299,17 +275,23 @@ export class ChannelzCallTracker {
}
}
export class ChannelzCallTrackerStub extends ChannelzCallTracker {
override addCallStarted() {}
override addCallSucceeded() {}
override addCallFailed() {}
}
export interface ChannelzChildren {
channels: ChannelRef[];
subchannels: SubchannelRef[];
sockets: SocketRef[];
channels: OrderedMap<number, { ref: ChannelRef; count: number }>;
subchannels: OrderedMap<number, { ref: SubchannelRef; count: number }>;
sockets: OrderedMap<number, { ref: SocketRef; count: number }>;
}
export interface ChannelInfo {
target: string;
state: ConnectivityState;
trace: ChannelzTrace;
callTracker: ChannelzCallTracker;
trace: ChannelzTrace | ChannelzTraceStub;
callTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
children: ChannelzChildren;
}
@ -348,105 +330,102 @@ export interface SocketInfo {
remoteFlowControlWindow: number | null;
}
interface ChannelEntry {
type ChannelEntry = {
ref: ChannelRef;
getInfo(): ChannelInfo;
}
};
interface SubchannelEntry {
type SubchannelEntry = {
ref: SubchannelRef;
getInfo(): SubchannelInfo;
}
};
interface ServerEntry {
type ServerEntry = {
ref: ServerRef;
getInfo(): ServerInfo;
}
};
interface SocketEntry {
type SocketEntry = {
ref: SocketRef;
getInfo(): SocketInfo;
};
export const enum EntityTypes {
channel = 'channel',
subchannel = 'subchannel',
server = 'server',
socket = 'socket',
}
let nextId = 1;
const entityMaps = {
[EntityTypes.channel]: new OrderedMap<number, ChannelEntry>(),
[EntityTypes.subchannel]: new OrderedMap<number, SubchannelEntry>(),
[EntityTypes.server]: new OrderedMap<number, ServerEntry>(),
[EntityTypes.socket]: new OrderedMap<number, SocketEntry>(),
} as const;
function getNextId(): number {
return nextId++;
}
export type RefByType<T extends EntityTypes> = T extends EntityTypes.channel
? ChannelRef
: T extends EntityTypes.server
? ServerRef
: T extends EntityTypes.socket
? SocketRef
: T extends EntityTypes.subchannel
? SubchannelRef
: never;
const channels: (ChannelEntry | undefined)[] = [];
const subchannels: (SubchannelEntry | undefined)[] = [];
const servers: (ServerEntry | undefined)[] = [];
const sockets: (SocketEntry | undefined)[] = [];
export type EntryByType<T extends EntityTypes> = T extends EntityTypes.channel
? ChannelEntry
: T extends EntityTypes.server
? ServerEntry
: T extends EntityTypes.socket
? SocketEntry
: T extends EntityTypes.subchannel
? SubchannelEntry
: never;
export function registerChannelzChannel(
name: string,
getInfo: () => ChannelInfo,
channelzEnabled: boolean
): ChannelRef {
const id = getNextId();
const ref: ChannelRef = { id, name, kind: 'channel' };
if (channelzEnabled) {
channels[id] = { ref, getInfo };
export type InfoByType<T extends EntityTypes> = T extends EntityTypes.channel
? ChannelInfo
: T extends EntityTypes.subchannel
? SubchannelInfo
: T extends EntityTypes.server
? ServerInfo
: T extends EntityTypes.socket
? SocketInfo
: never;
const generateRegisterFn = <R extends EntityTypes>(kind: R) => {
let nextId = 1;
function getNextId(): number {
return nextId++;
}
return ref;
}
export function registerChannelzSubchannel(
name: string,
getInfo: () => SubchannelInfo,
channelzEnabled: boolean
): SubchannelRef {
const id = getNextId();
const ref: SubchannelRef = { id, name, kind: 'subchannel' };
if (channelzEnabled) {
subchannels[id] = { ref, getInfo };
}
return ref;
}
return (
name: string,
getInfo: () => InfoByType<R>,
channelzEnabled: boolean
): RefByType<R> => {
const id = getNextId();
const ref = { id, name, kind } as RefByType<R>;
if (channelzEnabled) {
// @ts-expect-error typing issues
entityMaps[kind].setElement(id, { ref, getInfo });
}
return ref;
};
};
export function registerChannelzServer(
getInfo: () => ServerInfo,
channelzEnabled: boolean
): ServerRef {
const id = getNextId();
const ref: ServerRef = { id, kind: 'server' };
if (channelzEnabled) {
servers[id] = { ref, getInfo };
}
return ref;
}
export function registerChannelzSocket(
name: string,
getInfo: () => SocketInfo,
channelzEnabled: boolean
): SocketRef {
const id = getNextId();
const ref: SocketRef = { id, name, kind: 'socket' };
if (channelzEnabled) {
sockets[id] = { ref, getInfo };
}
return ref;
}
export const registerChannelzChannel = generateRegisterFn(EntityTypes.channel);
export const registerChannelzSubchannel = generateRegisterFn(
EntityTypes.subchannel
);
export const registerChannelzServer = generateRegisterFn(EntityTypes.server);
export const registerChannelzSocket = generateRegisterFn(EntityTypes.socket);
export function unregisterChannelzRef(
ref: ChannelRef | SubchannelRef | ServerRef | SocketRef
) {
switch (ref.kind) {
case 'channel':
delete channels[ref.id];
return;
case 'subchannel':
delete subchannels[ref.id];
return;
case 'server':
delete servers[ref.id];
return;
case 'socket':
delete sockets[ref.id];
return;
}
entityMaps[ref.kind].eraseElementByKey(ref.id);
}
/**
@ -556,6 +535,17 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null {
function getChannelMessage(channelEntry: ChannelEntry): ChannelMessage {
const resolvedInfo = channelEntry.getInfo();
const channelRef: ChannelRefMessage[] = [];
const subchannelRef: SubchannelRefMessage[] = [];
resolvedInfo.children.channels.forEach(el => {
channelRef.push(channelRefToMessage(el[1].ref));
});
resolvedInfo.children.subchannels.forEach(el => {
subchannelRef.push(subchannelRefToMessage(el[1].ref));
});
return {
ref: channelRefToMessage(channelEntry.ref),
data: {
@ -569,12 +559,8 @@ function getChannelMessage(channelEntry: ChannelEntry): ChannelMessage {
),
trace: resolvedInfo.trace.getTraceMessage(),
},
channel_ref: resolvedInfo.children.channels.map(ref =>
channelRefToMessage(ref)
),
subchannel_ref: resolvedInfo.children.subchannels.map(ref =>
subchannelRefToMessage(ref)
),
channel_ref: channelRef,
subchannel_ref: subchannelRef,
};
}
@ -582,8 +568,9 @@ function GetChannel(
call: ServerUnaryCall<GetChannelRequest__Output, GetChannelResponse>,
callback: sendUnaryData<GetChannelResponse>
): void {
const channelId = Number.parseInt(call.request.channel_id);
const channelEntry = channels[channelId];
const channelId = parseInt(call.request.channel_id, 10);
const channelEntry =
entityMaps[EntityTypes.channel].getElementByKey(channelId);
if (channelEntry === undefined) {
callback({
code: Status.NOT_FOUND,
@ -598,27 +585,34 @@ function GetTopChannels(
call: ServerUnaryCall<GetTopChannelsRequest__Output, GetTopChannelsResponse>,
callback: sendUnaryData<GetTopChannelsResponse>
): void {
const maxResults = Number.parseInt(call.request.max_results);
const maxResults = parseInt(call.request.max_results, 10) || 100;
const resultList: ChannelMessage[] = [];
let i = Number.parseInt(call.request.start_channel_id);
for (; i < channels.length; i++) {
const channelEntry = channels[i];
if (channelEntry === undefined) {
continue;
}
resultList.push(getChannelMessage(channelEntry));
if (resultList.length >= maxResults) {
break;
}
const startId = parseInt(call.request.start_channel_id, 10);
const channelEntries = entityMaps[EntityTypes.channel];
let i: OrderedMapIterator<number, ChannelEntry>;
for (
i = channelEntries.lowerBound(startId);
!i.equals(channelEntries.end()) && resultList.length < maxResults;
i = i.next()
) {
resultList.push(getChannelMessage(i.pointer[1]));
}
callback(null, {
channel: resultList,
end: i >= servers.length,
end: i.equals(channelEntries.end()),
});
}
function getServerMessage(serverEntry: ServerEntry): ServerMessage {
const resolvedInfo = serverEntry.getInfo();
const listenSocket: SocketRefMessage[] = [];
resolvedInfo.listenerChildren.sockets.forEach(el => {
listenSocket.push(socketRefToMessage(el[1].ref));
});
return {
ref: serverRefToMessage(serverEntry.ref),
data: {
@ -630,9 +624,7 @@ function getServerMessage(serverEntry: ServerEntry): ServerMessage {
),
trace: resolvedInfo.trace.getTraceMessage(),
},
listen_socket: resolvedInfo.listenerChildren.sockets.map(ref =>
socketRefToMessage(ref)
),
listen_socket: listenSocket,
};
}
@ -640,8 +632,9 @@ function GetServer(
call: ServerUnaryCall<GetServerRequest__Output, GetServerResponse>,
callback: sendUnaryData<GetServerResponse>
): void {
const serverId = Number.parseInt(call.request.server_id);
const serverEntry = servers[serverId];
const serverId = parseInt(call.request.server_id, 10);
const serverEntries = entityMaps[EntityTypes.server];
const serverEntry = serverEntries.getElementByKey(serverId);
if (serverEntry === undefined) {
callback({
code: Status.NOT_FOUND,
@ -656,22 +649,23 @@ function GetServers(
call: ServerUnaryCall<GetServersRequest__Output, GetServersResponse>,
callback: sendUnaryData<GetServersResponse>
): void {
const maxResults = Number.parseInt(call.request.max_results);
const maxResults = parseInt(call.request.max_results, 10) || 100;
const startId = parseInt(call.request.start_server_id, 10);
const serverEntries = entityMaps[EntityTypes.server];
const resultList: ServerMessage[] = [];
let i = Number.parseInt(call.request.start_server_id);
for (; i < servers.length; i++) {
const serverEntry = servers[i];
if (serverEntry === undefined) {
continue;
}
resultList.push(getServerMessage(serverEntry));
if (resultList.length >= maxResults) {
break;
}
let i: OrderedMapIterator<number, ServerEntry>;
for (
i = serverEntries.lowerBound(startId);
!i.equals(serverEntries.end()) && resultList.length < maxResults;
i = i.next()
) {
resultList.push(getServerMessage(i.pointer[1]));
}
callback(null, {
server: resultList,
end: i >= servers.length,
end: i.equals(serverEntries.end()),
});
}
@ -679,8 +673,9 @@ function GetSubchannel(
call: ServerUnaryCall<GetSubchannelRequest__Output, GetSubchannelResponse>,
callback: sendUnaryData<GetSubchannelResponse>
): void {
const subchannelId = Number.parseInt(call.request.subchannel_id);
const subchannelEntry = subchannels[subchannelId];
const subchannelId = parseInt(call.request.subchannel_id, 10);
const subchannelEntry =
entityMaps[EntityTypes.subchannel].getElementByKey(subchannelId);
if (subchannelEntry === undefined) {
callback({
code: Status.NOT_FOUND,
@ -689,6 +684,12 @@ function GetSubchannel(
return;
}
const resolvedInfo = subchannelEntry.getInfo();
const listenSocket: SocketRefMessage[] = [];
resolvedInfo.children.sockets.forEach(el => {
listenSocket.push(socketRefToMessage(el[1].ref));
});
const subchannelMessage: SubchannelMessage = {
ref: subchannelRefToMessage(subchannelEntry.ref),
data: {
@ -702,9 +703,7 @@ function GetSubchannel(
),
trace: resolvedInfo.trace.getTraceMessage(),
},
socket_ref: resolvedInfo.children.sockets.map(ref =>
socketRefToMessage(ref)
),
socket_ref: listenSocket,
};
callback(null, { subchannel: subchannelMessage });
}
@ -735,8 +734,8 @@ function GetSocket(
call: ServerUnaryCall<GetSocketRequest__Output, GetSocketResponse>,
callback: sendUnaryData<GetSocketResponse>
): void {
const socketId = Number.parseInt(call.request.socket_id);
const socketEntry = sockets[socketId];
const socketId = parseInt(call.request.socket_id, 10);
const socketEntry = entityMaps[EntityTypes.socket].getElementByKey(socketId);
if (socketEntry === undefined) {
callback({
code: Status.NOT_FOUND,
@ -809,8 +808,9 @@ function GetServerSockets(
>,
callback: sendUnaryData<GetServerSocketsResponse>
): void {
const serverId = Number.parseInt(call.request.server_id);
const serverEntry = servers[serverId];
const serverId = parseInt(call.request.server_id, 10);
const serverEntry = entityMaps[EntityTypes.server].getElementByKey(serverId);
if (serverEntry === undefined) {
callback({
code: Status.NOT_FOUND,
@ -818,28 +818,28 @@ function GetServerSockets(
});
return;
}
const startId = Number.parseInt(call.request.start_socket_id);
const maxResults = Number.parseInt(call.request.max_results);
const startId = parseInt(call.request.start_socket_id, 10);
const maxResults = parseInt(call.request.max_results, 10) || 100;
const resolvedInfo = serverEntry.getInfo();
// If we wanted to include listener sockets in the result, this line would
// instead say
// const allSockets = resolvedInfo.listenerChildren.sockets.concat(resolvedInfo.sessionChildren.sockets).sort((ref1, ref2) => ref1.id - ref2.id);
const allSockets = resolvedInfo.sessionChildren.sockets.sort(
(ref1, ref2) => ref1.id - ref2.id
);
const allSockets = resolvedInfo.sessionChildren.sockets;
const resultList: SocketRefMessage[] = [];
let i = 0;
for (; i < allSockets.length; i++) {
if (allSockets[i].id >= startId) {
resultList.push(socketRefToMessage(allSockets[i]));
if (resultList.length >= maxResults) {
break;
}
}
let i: OrderedMapIterator<number, { ref: SocketRef }>;
for (
i = allSockets.lowerBound(startId);
!i.equals(allSockets.end()) && resultList.length < maxResults;
i = i.next()
) {
resultList.push(socketRefToMessage(i.pointer[1].ref));
}
callback(null, {
socket_ref: resultList,
end: i >= allSockets.length,
end: i.equals(allSockets.end()),
});
}

View File

@ -25,7 +25,7 @@ import { Endpoint, SubchannelAddress } from './subchannel-address';
import { ChannelOptions } from './channel-options';
import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker';
import { ChannelRef, SubchannelRef } from './channelz';
import type { ChannelRef, SubchannelRef } from './channelz';
import { SubchannelInterface } from './subchannel-interface';
const TYPE_NAME = 'child_load_balancer_helper';

View File

@ -19,7 +19,7 @@ import { ChannelOptions } from './channel-options';
import { Endpoint, SubchannelAddress } from './subchannel-address';
import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker';
import { ChannelRef, SubchannelRef } from './channelz';
import type { ChannelRef, SubchannelRef } from './channelz';
import { SubchannelInterface } from './subchannel-interface';
import { LoadBalancingConfig } from './service-config';
import { log } from './logging';

View File

@ -19,12 +19,12 @@ import { EventEmitter } from 'events';
import { Duplex, Readable, Writable } from 'stream';
import { Status } from './constants';
import { Deserialize, Serialize } from './make-client';
import type { Deserialize, Serialize } from './make-client';
import { Metadata } from './metadata';
import { ObjectReadable, ObjectWritable } from './object-stream';
import { StatusObject, PartialStatusObject } from './call-interface';
import { Deadline } from './deadline';
import { ServerInterceptingCallInterface } from './server-interceptors';
import type { ObjectReadable, ObjectWritable } from './object-stream';
import type { StatusObject, PartialStatusObject } from './call-interface';
import type { Deadline } from './deadline';
import type { ServerInterceptingCallInterface } from './server-interceptors';
export type ServerStatusResponse = Partial<StatusObject>;
@ -330,7 +330,7 @@ export interface UnaryHandler<RequestType, ResponseType> {
func: handleUnaryCall<RequestType, ResponseType>;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
type: HandlerType;
type: 'unary';
path: string;
}
@ -338,7 +338,7 @@ export interface ClientStreamingHandler<RequestType, ResponseType> {
func: handleClientStreamingCall<RequestType, ResponseType>;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
type: HandlerType;
type: 'clientStream';
path: string;
}
@ -346,7 +346,7 @@ export interface ServerStreamingHandler<RequestType, ResponseType> {
func: handleServerStreamingCall<RequestType, ResponseType>;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
type: HandlerType;
type: 'serverStream';
path: string;
}
@ -354,7 +354,7 @@ export interface BidiStreamingHandler<RequestType, ResponseType> {
func: handleBidiStreamingCall<RequestType, ResponseType>;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
type: HandlerType;
type: 'bidi';
path: string;
}

View File

@ -64,8 +64,11 @@ import {
} from './uri-parser';
import {
ChannelzCallTracker,
ChannelzCallTrackerStub,
ChannelzChildrenTracker,
ChannelzChildrenTrackerStub,
ChannelzTrace,
ChannelzTraceStub,
registerChannelzServer,
registerChannelzSocket,
ServerInfo,
@ -87,6 +90,7 @@ import { CallEventTracker } from './transport';
const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000;
const MAX_CONNECTION_IDLE_MS = 30 * 60 * 1e3; // 30 min
const { HTTP2_HEADER_PATH } = http2.constants;
@ -177,9 +181,10 @@ function getDefaultHandler(handlerType: HandlerType, methodName: string) {
interface ChannelzSessionInfo {
ref: SocketRef;
streamTracker: ChannelzCallTracker;
streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
messagesSent: number;
messagesReceived: number;
keepAlivesSent: number;
lastMessageSentTimestamp: Date | null;
lastMessageReceivedTimestamp: Date | null;
}
@ -243,6 +248,13 @@ export interface ServerOptions extends ChannelOptions {
export class Server {
private boundPorts: Map<string, BoundPort> = new Map();
private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();
private sessionIdleTimeouts = new Map<
http2.Http2Session,
{
activeStreams: number;
timeout: NodeJS.Timeout | null;
}
>();
private handlers: Map<string, UntypedHandler> = new Map<
string,
@ -261,10 +273,14 @@ export class Server {
// Channelz Info
private readonly channelzEnabled: boolean = true;
private channelzRef: ServerRef;
private channelzTrace = new ChannelzTrace();
private callTracker = new ChannelzCallTracker();
private listenerChildrenTracker = new ChannelzChildrenTracker();
private sessionChildrenTracker = new ChannelzChildrenTracker();
private channelzTrace: ChannelzTrace | ChannelzTraceStub;
private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
private listenerChildrenTracker:
| ChannelzChildrenTracker
| ChannelzChildrenTrackerStub;
private sessionChildrenTracker:
| ChannelzChildrenTracker
| ChannelzChildrenTrackerStub;
private readonly maxConnectionAgeMs: number;
private readonly maxConnectionAgeGraceMs: number;
@ -272,6 +288,8 @@ export class Server {
private readonly keepaliveTimeMs: number;
private readonly keepaliveTimeoutMs: number;
private readonly sessionIdleTimeout: number;
private readonly interceptors: ServerInterceptor[];
/**
@ -284,14 +302,24 @@ export class Server {
this.options = options ?? {};
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
this.channelzTrace = new ChannelzTraceStub();
this.callTracker = new ChannelzCallTrackerStub();
this.listenerChildrenTracker = new ChannelzChildrenTrackerStub();
this.sessionChildrenTracker = new ChannelzChildrenTrackerStub();
} else {
this.channelzTrace = new ChannelzTrace();
this.callTracker = new ChannelzCallTracker();
this.listenerChildrenTracker = new ChannelzChildrenTracker();
this.sessionChildrenTracker = new ChannelzChildrenTracker();
}
this.channelzRef = registerChannelzServer(
'server',
() => this.getChannelzInfo(),
this.channelzEnabled
);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Server created');
}
this.channelzTrace.addTrace('CT_INFO', 'Server created');
this.maxConnectionAgeMs =
this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
this.maxConnectionAgeGraceMs =
@ -301,6 +329,9 @@ export class Server {
this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS;
this.keepaliveTimeoutMs =
this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
this.sessionIdleTimeout =
this.options['grpc.max_connection_idle'] ?? MAX_CONNECTION_IDLE_MS;
this.commonServerOptions = {
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
};
@ -382,7 +413,7 @@ export class Server {
streamsFailed: sessionInfo.streamTracker.callsFailed,
messagesSent: sessionInfo.messagesSent,
messagesReceived: sessionInfo.messagesReceived,
keepAlivesSent: 0,
keepAlivesSent: sessionInfo.keepAlivesSent,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp:
sessionInfo.streamTracker.lastCallStartedTimestamp,
@ -581,9 +612,8 @@ export class Server {
const channelzRef = this.registerListenerToChannelz(
boundSubchannelAddress
);
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
this.listenerChildrenTracker.refChild(channelzRef);
this.http2Servers.set(http2Server, {
channelzRef: channelzRef,
sessions: new Set(),
@ -854,7 +884,7 @@ export class Server {
);
const serverInfo = this.http2Servers.get(server);
server.close(() => {
if (this.channelzEnabled && serverInfo) {
if (serverInfo) {
this.listenerChildrenTracker.unrefChild(serverInfo.channelzRef);
unregisterChannelzRef(serverInfo.channelzRef);
}
@ -870,15 +900,15 @@ export class Server {
this.trace('Closing session initiated by ' + session.socket?.remoteAddress);
const sessionInfo = this.sessions.get(session);
const closeCallback = () => {
if (this.channelzEnabled && sessionInfo) {
if (sessionInfo) {
this.sessionChildrenTracker.unrefChild(sessionInfo.ref);
unregisterChannelzRef(sessionInfo.ref);
this.sessions.delete(session);
}
this.sessions.delete(session);
callback?.();
};
if (session.closed) {
process.nextTick(closeCallback);
queueMicrotask(closeCallback);
} else {
session.close(closeCallback);
}
@ -956,14 +986,13 @@ export class Server {
const allSessions: Set<http2.Http2Session> = new Set();
for (const http2Server of boundPortObject.listeningServers) {
const serverEntry = this.http2Servers.get(http2Server);
if (!serverEntry) {
continue;
}
for (const session of serverEntry.sessions) {
allSessions.add(session);
this.closeSession(session, () => {
allSessions.delete(session);
});
if (serverEntry) {
for (const session of serverEntry.sessions) {
allSessions.add(session);
this.closeSession(session, () => {
allSessions.delete(session);
});
}
}
}
/* After the grace time ends, send another goaway to all remaining sessions
@ -995,9 +1024,7 @@ export class Server {
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
});
this.sessions.clear();
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
unregisterChannelzRef(this.channelzRef);
this.shutdown = true;
}
@ -1049,9 +1076,7 @@ export class Server {
tryShutdown(callback: (error?: Error) => void): void {
const wrappedCallback = (error?: Error) => {
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
unregisterChannelzRef(this.channelzRef);
callback(error);
};
let pendingChecks = 0;
@ -1065,24 +1090,26 @@ export class Server {
}
this.shutdown = true;
for (const server of this.http2Servers.keys()) {
for (const [serverKey, server] of this.http2Servers.entries()) {
pendingChecks++;
const serverString = this.http2Servers.get(server)!.channelzRef.name;
const serverString = server.channelzRef.name;
this.trace('Waiting for server ' + serverString + ' to close');
this.closeServer(server, () => {
this.closeServer(serverKey, () => {
this.trace('Server ' + serverString + ' finished closing');
maybeCallback();
});
for (const session of server.sessions.keys()) {
pendingChecks++;
const sessionString = session.socket?.remoteAddress;
this.trace('Waiting for session ' + sessionString + ' to close');
this.closeSession(session, () => {
this.trace('Session ' + sessionString + ' finished closing');
maybeCallback();
});
}
}
for (const session of this.sessions.keys()) {
pendingChecks++;
const sessionString = session.socket?.remoteAddress;
this.trace('Waiting for session ' + sessionString + ' to close');
this.closeSession(session, () => {
this.trace('Session ' + sessionString + ' finished closing');
maybeCallback();
});
}
if (pendingChecks === 0) {
wrappedCallback();
}
@ -1160,142 +1187,395 @@ export class Server {
};
stream.respond(trailersToSend, { endStream: true });
if (this.channelzEnabled) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed();
}
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed();
}
private _channelzHandler(
stream: http2.ServerHttp2Stream,
headers: http2.IncomingHttpHeaders
private _sessionHandler(
http2Server: http2.Http2Server | http2.Http2SecureServer
) {
const channelzSessionInfo = this.sessions.get(
stream.session as http2.ServerHttp2Session
);
return (session: http2.ServerHttp2Session) => {
this.http2Servers.get(http2Server)?.sessions.add(session);
this.callTracker.addCallStarted();
channelzSessionInfo?.streamTracker.addCallStarted();
let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let sessionClosedByServer = false;
if (!this._verifyContentType(stream, headers)) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed();
return;
}
const idleTimeoutObj = this.enableIdleTimeout(session);
const path = headers[HTTP2_HEADER_PATH] as string;
if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
// Apply a random jitter within a +/-10% range
const jitterMagnitude = this.maxConnectionAgeMs / 10;
const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
const handler = this._retrieveHandler(path);
if (!handler) {
this._respondWithError(
getUnimplementedStatusResponse(path),
stream,
channelzSessionInfo
);
return;
}
connectionAgeTimer = setTimeout(() => {
sessionClosedByServer = true;
const callEventTracker: CallEventTracker = {
addMessageSent: () => {
if (channelzSessionInfo) {
channelzSessionInfo.messagesSent += 1;
channelzSessionInfo.lastMessageSentTimestamp = new Date();
}
},
addMessageReceived: () => {
if (channelzSessionInfo) {
channelzSessionInfo.messagesReceived += 1;
channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
}
},
onCallEnd: status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
},
onStreamEnd: success => {
if (channelzSessionInfo) {
if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded();
} else {
channelzSessionInfo.streamTracker.addCallFailed();
this.trace(
`Connection dropped by max connection age: ${session.socket?.remoteAddress}`
);
try {
session.goaway(
http2.constants.NGHTTP2_NO_ERROR,
~(1 << 31),
Buffer.from('max_age')
);
} catch (e) {
// The goaway can't be sent because the session is already closed
session.destroy();
return;
}
session.close();
/* Allow a grace period after sending the GOAWAY before forcibly
* closing the connection. */
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs).unref?.();
}
}, this.maxConnectionAgeMs + jitter).unref?.();
}
const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => {
const timeoutTImer = setTimeout(() => {
sessionClosedByServer = true;
session.close();
}, this.keepaliveTimeoutMs).unref?.();
try {
session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTImer);
if (err) {
sessionClosedByServer = true;
this.trace(
`Connection dropped due to error of a ping frame ${err.message} return in ${duration}`
);
session.close();
}
}
);
} catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
},
};
}, this.keepaliveTimeMs).unref?.();
const call = getServerInterceptingCall(
this.interceptors,
stream,
headers,
callEventTracker,
handler,
this.options
);
session.on('close', () => {
if (!sessionClosedByServer) {
this.trace(
`Connection dropped by client ${session.socket?.remoteAddress}`
);
}
if (!this._runHandlerForCall(call, handler)) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed();
if (connectionAgeTimer) {
clearTimeout(connectionAgeTimer);
}
call.sendStatus({
code: Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`,
if (connectionAgeGraceTimer) {
clearTimeout(connectionAgeGraceTimer);
}
if (keeapliveTimeTimer) {
clearTimeout(keeapliveTimeTimer);
}
clearTimeout(idleTimeoutObj.timeout);
this.sessionIdleTimeouts.delete(session);
this.http2Servers.get(http2Server)?.sessions.delete(session);
});
}
};
}
private _streamHandler(
stream: http2.ServerHttp2Stream,
headers: http2.IncomingHttpHeaders
private _channelzSessionHandler(
http2Server: http2.Http2Server | http2.Http2SecureServer
) {
if (this._verifyContentType(stream, headers) !== true) {
return;
}
const path = headers[HTTP2_HEADER_PATH] as string;
const handler = this._retrieveHandler(path);
if (!handler) {
this._respondWithError(
getUnimplementedStatusResponse(path),
stream,
null
return (session: http2.ServerHttp2Session) => {
const channelzRef = registerChannelzSocket(
session.socket?.remoteAddress ?? 'unknown',
this.getChannelzSessionInfoGetter(session),
this.channelzEnabled
);
return;
}
const call = getServerInterceptingCall(
this.interceptors,
stream,
headers,
null,
handler,
this.options
);
const channelzSessionInfo: ChannelzSessionInfo = {
ref: channelzRef,
streamTracker: this.channelzEnabled
? new ChannelzCallTracker()
: new ChannelzCallTrackerStub(),
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
};
if (!this._runHandlerForCall(call, handler)) {
call.sendStatus({
code: Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`,
this.http2Servers.get(http2Server)?.sessions.add(session);
this.sessions.set(session, channelzSessionInfo);
const clientAddress = `${session.socket.remoteAddress}:${session.socket.remotePort}`;
this.channelzTrace.addTrace(
'CT_INFO',
'Connection established by client ' + clientAddress
);
this.trace('Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let sessionClosedByServer = false;
const idleTimeoutObj = this.enableIdleTimeout(session);
if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
// Apply a random jitter within a +/-10% range
const jitterMagnitude = this.maxConnectionAgeMs / 10;
const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
connectionAgeTimer = setTimeout(() => {
sessionClosedByServer = true;
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by max connection age from ' + clientAddress
);
try {
session.goaway(
http2.constants.NGHTTP2_NO_ERROR,
~(1 << 31),
Buffer.from('max_age')
);
} catch (e) {
// The goaway can't be sent because the session is already closed
session.destroy();
return;
}
session.close();
/* Allow a grace period after sending the GOAWAY before forcibly
* closing the connection. */
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs).unref?.();
}
}, this.maxConnectionAgeMs + jitter).unref?.();
}
const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => {
const timeoutTImer = setTimeout(() => {
sessionClosedByServer = true;
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by keepalive timeout from ' + clientAddress
);
session.close();
}, this.keepaliveTimeoutMs).unref?.();
try {
session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTImer);
if (err) {
sessionClosedByServer = true;
this.channelzTrace.addTrace(
'CT_INFO',
`Connection dropped due to error of a ping frame ${err.message} return in ${duration}`
);
session.close();
}
}
);
channelzSessionInfo.keepAlivesSent += 1;
} catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs).unref?.();
session.on('close', () => {
if (!sessionClosedByServer) {
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by client ' + clientAddress
);
}
this.trace(
`DROPPING ${channelzRef.name} - ${channelzRef.kind} - ${channelzRef.id}`
);
this.sessionChildrenTracker.unrefChild(channelzRef);
unregisterChannelzRef(channelzRef);
if (connectionAgeTimer) {
clearTimeout(connectionAgeTimer);
}
if (connectionAgeGraceTimer) {
clearTimeout(connectionAgeGraceTimer);
}
if (keeapliveTimeTimer) {
clearTimeout(keeapliveTimeTimer);
}
clearTimeout(idleTimeoutObj.timeout);
this.sessionIdleTimeouts.delete(session);
this.http2Servers.get(http2Server)?.sessions.delete(session);
this.sessions.delete(session);
});
}
};
}
private _channelzHandler(http2Server: http2.Http2Server) {
return (
stream: http2.ServerHttp2Stream,
headers: http2.IncomingHttpHeaders
) => {
// for handling idle timeout
this.onStreamOpened(stream);
const channelzSessionInfo = this.sessions.get(
stream.session as http2.ServerHttp2Session
);
this.callTracker.addCallStarted();
channelzSessionInfo?.streamTracker.addCallStarted();
if (!this._verifyContentType(stream, headers)) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed();
return;
}
const path = headers[HTTP2_HEADER_PATH] as string;
const handler = this._retrieveHandler(path);
if (!handler) {
this._respondWithError(
getUnimplementedStatusResponse(path),
stream,
channelzSessionInfo
);
return;
}
const callEventTracker: CallEventTracker = {
addMessageSent: () => {
if (channelzSessionInfo) {
channelzSessionInfo.messagesSent += 1;
channelzSessionInfo.lastMessageSentTimestamp = new Date();
}
},
addMessageReceived: () => {
if (channelzSessionInfo) {
channelzSessionInfo.messagesReceived += 1;
channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
}
},
onCallEnd: status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
},
onStreamEnd: success => {
if (channelzSessionInfo) {
if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded();
} else {
channelzSessionInfo.streamTracker.addCallFailed();
}
}
},
};
const call = getServerInterceptingCall(
this.interceptors,
stream,
headers,
callEventTracker,
handler,
this.options
);
if (!this._runHandlerForCall(call, handler)) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed();
call.sendStatus({
code: Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`,
});
}
};
}
private _streamHandler(http2Server: http2.Http2Server) {
return (
stream: http2.ServerHttp2Stream,
headers: http2.IncomingHttpHeaders
) => {
// for handling idle timeout
this.onStreamOpened(stream);
if (this._verifyContentType(stream, headers) !== true) {
return;
}
const path = headers[HTTP2_HEADER_PATH] as string;
const handler = this._retrieveHandler(path);
if (!handler) {
this._respondWithError(
getUnimplementedStatusResponse(path),
stream,
null
);
return;
}
const call = getServerInterceptingCall(
this.interceptors,
stream,
headers,
null,
handler,
this.options
);
if (!this._runHandlerForCall(call, handler)) {
call.sendStatus({
code: Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`,
});
}
};
}
private _runHandlerForCall(
call: ServerInterceptingCallInterface,
handler: Handler<any, any>
handler:
| UntypedUnaryHandler
| UntypedClientStreamingHandler
| UntypedServerStreamingHandler
| UntypedBidiStreamingHandler
): boolean {
const { type } = handler;
if (type === 'unary') {
handleUnary(call, handler as UntypedUnaryHandler);
handleUnary(call, handler);
} else if (type === 'clientStream') {
handleClientStreaming(call, handler as UntypedClientStreamingHandler);
handleClientStreaming(call, handler);
} else if (type === 'serverStream') {
handleServerStreaming(call, handler as UntypedServerStreamingHandler);
handleServerStreaming(call, handler);
} else if (type === 'bidi') {
handleBidiStreaming(call, handler as UntypedBidiStreamingHandler);
handleBidiStreaming(call, handler);
} else {
return false;
}
@ -1322,118 +1602,78 @@ export class Server {
this.serverAddressString = serverAddressString;
const handler = this.channelzEnabled
? this._channelzHandler
: this._streamHandler;
? this._channelzHandler(http2Server)
: this._streamHandler(http2Server);
http2Server.on('stream', handler.bind(this));
http2Server.on('session', session => {
const channelzRef = registerChannelzSocket(
session.socket.remoteAddress ?? 'unknown',
this.getChannelzSessionInfoGetter(session),
this.channelzEnabled
const sessionHandler = this.channelzEnabled
? this._channelzSessionHandler(http2Server)
: this._sessionHandler(http2Server);
http2Server.on('stream', handler);
http2Server.on('session', sessionHandler);
}
private enableIdleTimeout(session: http2.ServerHttp2Session) {
const idleTimeoutObj = {
activeStreams: 0,
timeout: setTimeout(
this.onIdleTimeout,
this.sessionIdleTimeout,
this,
session
).unref(),
};
this.sessionIdleTimeouts.set(session, idleTimeoutObj);
this.trace(`Enable idle timeout for ${session.socket?.remoteAddress}`);
return idleTimeoutObj;
}
private onIdleTimeout(ctx: Server, session: http2.ServerHttp2Session) {
ctx.trace(`Idle timeout for ${session.socket?.remoteAddress}`);
ctx.closeSession(session);
}
private onStreamOpened(stream: http2.ServerHttp2Stream) {
const session = stream.session as http2.ServerHttp2Session;
this.trace(`Stream opened for ${session.socket?.remoteAddress}`);
const idleTimeoutObj = this.sessionIdleTimeouts.get(session);
if (idleTimeoutObj) {
idleTimeoutObj.activeStreams += 1;
if (idleTimeoutObj.timeout) {
clearTimeout(idleTimeoutObj.timeout);
idleTimeoutObj.timeout = null;
}
this.trace(
`onStreamOpened: adding on stream close event for ${session.socket?.remoteAddress}`
);
stream.once('close', () => this.onStreamClose(session));
} else {
this.trace(
`onStreamOpened: missing stream for ${session.socket?.remoteAddress}`
);
}
}
const channelzSessionInfo: ChannelzSessionInfo = {
ref: channelzRef,
streamTracker: new ChannelzCallTracker(),
messagesSent: 0,
messagesReceived: 0,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
};
this.http2Servers.get(http2Server)?.sessions.add(session);
this.sessions.set(session, channelzSessionInfo);
const clientAddress = session.socket.remoteAddress;
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
'CT_INFO',
'Connection established by client ' + clientAddress
private onStreamClose(session: http2.ServerHttp2Session) {
this.trace(`Stream closed for ${session.socket?.remoteAddress}`);
const idleTimeoutObj = this.sessionIdleTimeouts.get(session);
if (idleTimeoutObj) {
idleTimeoutObj.activeStreams -= 1;
if (idleTimeoutObj.activeStreams === 0) {
this.trace(
`onStreamClose: set idle timeout for ${this.sessionIdleTimeout}ms ${session.socket?.remoteAddress}`
);
this.sessionChildrenTracker.refChild(channelzRef);
idleTimeoutObj.timeout = setTimeout(
this.onIdleTimeout,
this.sessionIdleTimeout,
this,
session
).unref();
}
let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let sessionClosedByServer = false;
if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
// Apply a random jitter within a +/-10% range
const jitterMagnitude = this.maxConnectionAgeMs / 10;
const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
connectionAgeTimer = setTimeout(() => {
sessionClosedByServer = true;
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by max connection age from ' + clientAddress
);
}
try {
session.goaway(
http2.constants.NGHTTP2_NO_ERROR,
~(1 << 31),
Buffer.from('max_age')
);
} catch (e) {
// The goaway can't be sent because the session is already closed
session.destroy();
return;
}
session.close();
/* Allow a grace period after sending the GOAWAY before forcibly
* closing the connection. */
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs).unref?.();
}
}, this.maxConnectionAgeMs + jitter).unref?.();
}
const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => {
const timeoutTImer = setTimeout(() => {
sessionClosedByServer = true;
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by keepalive timeout from ' + clientAddress
);
}
session.close();
}, this.keepaliveTimeoutMs).unref?.();
try {
session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTImer);
}
);
} catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs).unref?.();
session.on('close', () => {
if (this.channelzEnabled) {
if (!sessionClosedByServer) {
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by client ' + clientAddress
);
}
this.sessionChildrenTracker.unrefChild(channelzRef);
unregisterChannelzRef(channelzRef);
}
if (connectionAgeTimer) {
clearTimeout(connectionAgeTimer);
}
if (connectionAgeGraceTimer) {
clearTimeout(connectionAgeGraceTimer);
}
if (keeapliveTimeTimer) {
clearTimeout(keeapliveTimeTimer);
}
this.http2Servers.get(http2Server)?.sessions.delete(session);
this.sessions.delete(session);
});
});
}
}
}

View File

@ -15,7 +15,7 @@
*
*/
import { SubchannelRef } from './channelz';
import type { SubchannelRef } from './channelz';
import { ConnectivityState } from './connectivity-state';
import { Subchannel } from './subchannel';

View File

@ -31,10 +31,13 @@ import {
SubchannelRef,
ChannelzTrace,
ChannelzChildrenTracker,
ChannelzChildrenTrackerStub,
SubchannelInfo,
registerChannelzSubchannel,
ChannelzCallTracker,
ChannelzCallTrackerStub,
unregisterChannelzRef,
ChannelzTraceStub,
} from './channelz';
import {
ConnectivityStateListener,
@ -89,12 +92,15 @@ export class Subchannel {
// Channelz info
private readonly channelzEnabled: boolean = true;
private channelzRef: SubchannelRef;
private channelzTrace: ChannelzTrace;
private callTracker = new ChannelzCallTracker();
private childrenTracker = new ChannelzChildrenTracker();
private channelzTrace: ChannelzTrace | ChannelzTraceStub;
private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
private childrenTracker:
| ChannelzChildrenTracker
| ChannelzChildrenTrackerStub;
// Channelz socket info
private streamTracker = new ChannelzCallTracker();
private streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
/**
* A class representing a connection to a single backend.
@ -127,16 +133,24 @@ export class Subchannel {
if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
this.channelzTrace = new ChannelzTraceStub();
this.callTracker = new ChannelzCallTrackerStub();
this.childrenTracker = new ChannelzChildrenTrackerStub();
this.streamTracker = new ChannelzCallTrackerStub();
} else {
this.channelzTrace = new ChannelzTrace();
this.callTracker = new ChannelzCallTracker();
this.childrenTracker = new ChannelzChildrenTracker();
this.streamTracker = new ChannelzCallTracker();
}
this.channelzTrace = new ChannelzTrace();
this.channelzRef = registerChannelzSubchannel(
this.subchannelAddressString,
() => this.getChannelzInfo(),
this.channelzEnabled
);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
}
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
this.trace(
'Subchannel constructed with options ' +
JSON.stringify(options, undefined, 2)
@ -338,12 +352,8 @@ export class Subchannel {
this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount - 1));
this.refcount -= 1;
if (this.refcount === 0) {
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
}
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
unregisterChannelzRef(this.channelzRef);
process.nextTick(() => {
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],

View File

@ -28,6 +28,7 @@ import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import {
ChannelzCallTracker,
ChannelzCallTrackerStub,
registerChannelzSocket,
SocketInfo,
SocketRef,
@ -136,7 +137,7 @@ class Http2Transport implements Transport {
// Channelz info
private channelzRef: SocketRef;
private readonly channelzEnabled: boolean = true;
private streamTracker = new ChannelzCallTracker();
private streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
private keepalivesSent = 0;
private messagesSent = 0;
private messagesReceived = 0;
@ -159,12 +160,17 @@ class Http2Transport implements Transport {
if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
this.streamTracker = new ChannelzCallTrackerStub();
} else {
this.streamTracker = new ChannelzCallTracker();
}
this.channelzRef = registerChannelzSocket(
this.subchannelAddressString,
() => this.getChannelzInfo(),
this.channelzEnabled
);
// Build user-agent string.
this.userAgent = [
options['grpc.primary_user_agent'],
@ -192,6 +198,7 @@ class Http2Transport implements Transport {
this.stopKeepalivePings();
this.handleDisconnect();
});
session.once(
'goaway',
(errorCode: number, lastStreamID: number, opaqueData?: Buffer) => {
@ -214,11 +221,13 @@ class Http2Transport implements Transport {
this.reportDisconnectToOwner(tooManyPings);
}
);
session.once('error', error => {
/* Do nothing here. Any error should also trigger a close event, which is
* where we want to handle that. */
this.trace('connection closed with error ' + (error as Error).message);
});
if (logging.isTracerEnabled(TRACER_NAME)) {
session.on('remoteSettings', (settings: http2.Settings) => {
this.trace(
@ -237,6 +246,7 @@ class Http2Transport implements Transport {
);
});
}
/* Start the keepalive timer last, because this can trigger trace logs,
* which should only happen after everything else is set up. */
if (this.keepaliveWithoutCalls) {
@ -625,6 +635,7 @@ export class Http2SubchannelConnector implements SubchannelConnector {
private session: http2.ClientHttp2Session | null = null;
private isShutdown = false;
constructor(private channelTarget: GrpcUri) {}
private trace(text: string) {
logging.trace(
LogVerbosity.DEBUG,
@ -632,6 +643,7 @@ export class Http2SubchannelConnector implements SubchannelConnector {
uriToString(this.channelTarget) + ' ' + text
);
}
private createSession(
address: SubchannelAddress,
credentials: ChannelCredentials,
@ -641,6 +653,7 @@ export class Http2SubchannelConnector implements SubchannelConnector {
if (this.isShutdown) {
return Promise.reject();
}
return new Promise<Http2Transport>((resolve, reject) => {
let remoteName: string | null;
if (proxyConnectionResult.realTarget) {
@ -767,6 +780,7 @@ export class Http2SubchannelConnector implements SubchannelConnector {
});
});
}
connect(
address: SubchannelAddress,
credentials: ChannelCredentials,

View File

@ -31,7 +31,7 @@ import {
HealthListener,
SubchannelInterface,
} from '../src/subchannel-interface';
import { SubchannelRef } from '../src/channelz';
import { EntityTypes, SubchannelRef } from '../src/channelz';
import { Subchannel } from '../src/subchannel';
import { ConnectivityState } from '../src/connectivity-state';
@ -196,7 +196,7 @@ export class MockSubchannel implements SubchannelInterface {
unref(): void {}
getChannelzRef(): SubchannelRef {
return {
kind: 'subchannel',
kind: EntityTypes.subchannel,
id: -1,
name: this.address,
};