From e0b900dd6903922b66bbfd2e17f00450d12f66b6 Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 27 Feb 2024 12:27:25 -0800 Subject: [PATCH] feat: channelz improvements, idle timeout implementation --- packages/grpc-js/.eslintrc | 1 + packages/grpc-js/gulpfile.ts | 33 +- packages/grpc-js/package.json | 37 +- packages/grpc-js/src/channel.ts | 2 +- packages/grpc-js/src/channelz.ts | 456 +++++------ .../src/load-balancer-child-handler.ts | 2 +- packages/grpc-js/src/load-balancer.ts | 2 +- packages/grpc-js/src/server-call.ts | 18 +- packages/grpc-js/src/server.ts | 748 ++++++++++++------ packages/grpc-js/src/subchannel-interface.ts | 2 +- packages/grpc-js/src/subchannel.ts | 38 +- packages/grpc-js/src/transport.ts | 16 +- packages/grpc-js/test/common.ts | 4 +- 13 files changed, 813 insertions(+), 546 deletions(-) diff --git a/packages/grpc-js/.eslintrc b/packages/grpc-js/.eslintrc index 2f6bfd62..9a72b31d 100644 --- a/packages/grpc-js/.eslintrc +++ b/packages/grpc-js/.eslintrc @@ -50,6 +50,7 @@ "@typescript-eslint/explicit-module-boundary-types": "off", "@typescript-eslint/ban-types": "off", "@typescript-eslint/camelcase": "off", + "@typescript-eslint/no-explicit-any": "off", "node/no-missing-import": "off", "node/no-empty-function": "off", "node/no-unsupported-features/es-syntax": "off", diff --git a/packages/grpc-js/gulpfile.ts b/packages/grpc-js/gulpfile.ts index d8590036..e4e9071f 100644 --- a/packages/grpc-js/gulpfile.ts +++ b/packages/grpc-js/gulpfile.ts @@ -35,14 +35,17 @@ const pkgPath = path.resolve(jsCoreDir, 'package.json'); const supportedVersionRange = require(pkgPath).engines.node; const versionNotSupported = () => { console.log(`Skipping grpc-js task for Node ${process.version}`); - return () => { return Promise.resolve(); }; + return () => { + return Promise.resolve(); + }; }; const identity = (value: any): any => value; -const checkTask = semver.satisfies(process.version, supportedVersionRange) ? - identity : versionNotSupported; +const checkTask = semver.satisfies(process.version, supportedVersionRange) + ? identity + : versionNotSupported; const execNpmVerb = (verb: string, ...args: string[]) => - execa('npm', [verb, ...args], {cwd: jsCoreDir, stdio: 'inherit'}); + execa('npm', [verb, ...args], { cwd: jsCoreDir, stdio: 'inherit' }); const execNpmCommand = execNpmVerb.bind(null, 'run'); const install = checkTask(() => execNpmVerb('install', '--unsafe-perm')); @@ -64,22 +67,20 @@ const cleanAll = gulp.parallel(clean); */ const compile = checkTask(() => execNpmCommand('compile')); -const copyTestFixtures = checkTask(() => ncpP(`${jsCoreDir}/test/fixtures`, `${outDir}/test/fixtures`)); +const copyTestFixtures = checkTask(() => + ncpP(`${jsCoreDir}/test/fixtures`, `${outDir}/test/fixtures`) +); const runTests = checkTask(() => { process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION = 'true'; - return gulp.src(`${outDir}/test/**/*.js`) - .pipe(mocha({reporter: 'mocha-jenkins-reporter', - require: ['ts-node/register']})); + return gulp.src(`${outDir}/test/**/*.js`).pipe( + mocha({ + reporter: 'mocha-jenkins-reporter', + require: ['ts-node/register'], + }) + ); }); const test = gulp.series(install, copyTestFixtures, runTests); -export { - install, - lint, - clean, - cleanAll, - compile, - test -} +export { install, lint, clean, cleanAll, compile, test }; diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index d1cb3d56..34d8b558 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -6,7 +6,7 @@ "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", "main": "build/src/index.js", "engines": { - "node": "^8.13.0 || >=10.10.0" + "node": ">=12.10.0" }, "keywords": [], "author": { @@ -15,17 +15,18 @@ "types": "build/src/index.d.ts", "license": "Apache-2.0", "devDependencies": { - "@types/gulp": "^4.0.6", - "@types/gulp-mocha": "0.0.32", - "@types/lodash": "^4.14.186", - "@types/mocha": "^5.2.6", - "@types/ncp": "^2.0.1", - "@types/pify": "^3.0.2", - "@types/semver": "^7.3.9", - "@typescript-eslint/eslint-plugin": "^5.59.11", - "@typescript-eslint/parser": "^5.59.11", - "@typescript-eslint/typescript-estree": "^5.59.11", - "clang-format": "^1.0.55", + "@types/gulp": "^4.0.17", + "@types/gulp-mocha": "0.0.37", + "@types/lodash": "^4.14.202", + "@types/mocha": "^10.0.6", + "@types/ncp": "^2.0.8", + "@types/pify": "^5.0.4", + "@types/semver": "^7.5.8", + "@types/node": ">=20.11.20", + "@typescript-eslint/eslint-plugin": "^7.1.0", + "@typescript-eslint/parser": "^7.1.0", + "@typescript-eslint/typescript-estree": "^7.1.0", + "clang-format": "^1.8.0", "eslint": "^8.42.0", "eslint-config-prettier": "^8.8.0", "eslint-plugin-node": "^11.1.0", @@ -33,16 +34,16 @@ "execa": "^2.0.3", "gulp": "^4.0.2", "gulp-mocha": "^6.0.0", - "lodash": "^4.17.4", + "lodash": "^4.17.21", "madge": "^5.0.1", "mocha-jenkins-reporter": "^0.4.1", "ncp": "^2.0.0", "pify": "^4.0.1", "prettier": "^2.8.8", "rimraf": "^3.0.2", - "semver": "^7.3.5", - "ts-node": "^10.9.1", - "typescript": "^5.1.3" + "semver": "^7.6.0", + "ts-node": "^10.9.2", + "typescript": "^5.3.3" }, "contributors": [ { @@ -65,8 +66,8 @@ "generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto" }, "dependencies": { - "@grpc/proto-loader": "^0.7.8", - "@types/node": ">=12.12.47" + "@grpc/proto-loader": "^0.7.10", + "@js-sdsl/ordered-map": "^4.4.2" }, "files": [ "src/**/*.ts", diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 7ce5a15f..514920c8 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options'; import { ServerSurfaceCall } from './server-call'; import { ConnectivityState } from './connectivity-state'; -import { ChannelRef } from './channelz'; +import type { ChannelRef } from './channelz'; import { Call } from './call-interface'; import { InternalChannel } from './internal-channel'; import { Deadline } from './deadline'; diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index 1e2627a9..6d70b754 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -16,6 +16,7 @@ */ import { isIPv4, isIPv6 } from 'net'; +import { OrderedMap, type OrderedMapIterator } from '@js-sdsl/ordered-map'; import { ConnectivityState } from './connectivity-state'; import { Status } from './constants'; import { Timestamp } from './generated/google/protobuf/Timestamp'; @@ -66,24 +67,25 @@ export type TraceSeverity = | 'CT_ERROR'; export interface ChannelRef { - kind: 'channel'; + kind: EntityTypes.channel; id: number; name: string; } export interface SubchannelRef { - kind: 'subchannel'; + kind: EntityTypes.subchannel; id: number; name: string; } export interface ServerRef { - kind: 'server'; + kind: EntityTypes.server; id: number; + name: string; } export interface SocketRef { - kind: 'socket'; + kind: EntityTypes.socket; id: number; name: string; } @@ -131,6 +133,21 @@ interface TraceEvent { */ const TARGET_RETAINED_TRACES = 32; +export class ChannelzTraceStub { + readonly events: TraceEvent[] = []; + readonly creationTimestamp: Date = new Date(); + readonly eventsLogged = 0; + + addTrace(): void {} + getTraceMessage(): ChannelTrace { + return { + creation_timestamp: dateToProtoTimestamp(this.creationTimestamp), + num_events_logged: this.eventsLogged, + events: [], + }; + } +} + export class ChannelzTrace { events: TraceEvent[] = []; creationTimestamp: Date; @@ -182,105 +199,64 @@ export class ChannelzTrace { } export class ChannelzChildrenTracker { - private channelChildren: Map = - new Map(); - private subchannelChildren: Map< + private channelChildren = new OrderedMap< + number, + { ref: ChannelRef; count: number } + >(); + private subchannelChildren = new OrderedMap< number, { ref: SubchannelRef; count: number } - > = new Map(); - private socketChildren: Map = - new Map(); + >(); + private socketChildren = new OrderedMap< + number, + { ref: SocketRef; count: number } + >(); + private trackerMap = { + [EntityTypes.channel]: this.channelChildren, + [EntityTypes.subchannel]: this.subchannelChildren, + [EntityTypes.socket]: this.socketChildren, + } as const; refChild(child: ChannelRef | SubchannelRef | SocketRef) { - switch (child.kind) { - case 'channel': { - const trackedChild = this.channelChildren.get(child.id) ?? { - ref: child, - count: 0, - }; - trackedChild.count += 1; - this.channelChildren.set(child.id, trackedChild); - break; - } - case 'subchannel': { - const trackedChild = this.subchannelChildren.get(child.id) ?? { - ref: child, - count: 0, - }; - trackedChild.count += 1; - this.subchannelChildren.set(child.id, trackedChild); - break; - } - case 'socket': { - const trackedChild = this.socketChildren.get(child.id) ?? { - ref: child, - count: 0, - }; - trackedChild.count += 1; - this.socketChildren.set(child.id, trackedChild); - break; - } + const tracker = this.trackerMap[child.kind]; + const trackedChild = tracker.getElementByKey(child.id); + + if (trackedChild === undefined) { + tracker.setElement(child.id, { + // @ts-expect-error union issues + ref: child, + count: 1, + }); + } else { + trackedChild.count += 1; } } unrefChild(child: ChannelRef | SubchannelRef | SocketRef) { - switch (child.kind) { - case 'channel': { - const trackedChild = this.channelChildren.get(child.id); - if (trackedChild !== undefined) { - trackedChild.count -= 1; - if (trackedChild.count === 0) { - this.channelChildren.delete(child.id); - } else { - this.channelChildren.set(child.id, trackedChild); - } - } - break; - } - case 'subchannel': { - const trackedChild = this.subchannelChildren.get(child.id); - if (trackedChild !== undefined) { - trackedChild.count -= 1; - if (trackedChild.count === 0) { - this.subchannelChildren.delete(child.id); - } else { - this.subchannelChildren.set(child.id, trackedChild); - } - } - break; - } - case 'socket': { - const trackedChild = this.socketChildren.get(child.id); - if (trackedChild !== undefined) { - trackedChild.count -= 1; - if (trackedChild.count === 0) { - this.socketChildren.delete(child.id); - } else { - this.socketChildren.set(child.id, trackedChild); - } - } - break; + const tracker = this.trackerMap[child.kind]; + const trackedChild = tracker.getElementByKey(child.id); + if (trackedChild !== undefined) { + trackedChild.count -= 1; + if (trackedChild.count === 0) { + tracker.eraseElementByKey(child.id); } } } getChildLists(): ChannelzChildren { - const channels: ChannelRef[] = []; - for (const { ref } of this.channelChildren.values()) { - channels.push(ref); - } - const subchannels: SubchannelRef[] = []; - for (const { ref } of this.subchannelChildren.values()) { - subchannels.push(ref); - } - const sockets: SocketRef[] = []; - for (const { ref } of this.socketChildren.values()) { - sockets.push(ref); - } - return { channels, subchannels, sockets }; + return { + channels: this.channelChildren, + subchannels: this.subchannelChildren, + sockets: this.socketChildren, + }; } } +export class ChannelzChildrenTrackerStub extends ChannelzChildrenTracker { + override refChild(): void {} + override unrefChild(): void {} +} + export class ChannelzCallTracker { callsStarted = 0; callsSucceeded = 0; @@ -299,17 +275,23 @@ export class ChannelzCallTracker { } } +export class ChannelzCallTrackerStub extends ChannelzCallTracker { + override addCallStarted() {} + override addCallSucceeded() {} + override addCallFailed() {} +} + export interface ChannelzChildren { - channels: ChannelRef[]; - subchannels: SubchannelRef[]; - sockets: SocketRef[]; + channels: OrderedMap; + subchannels: OrderedMap; + sockets: OrderedMap; } export interface ChannelInfo { target: string; state: ConnectivityState; - trace: ChannelzTrace; - callTracker: ChannelzCallTracker; + trace: ChannelzTrace | ChannelzTraceStub; + callTracker: ChannelzCallTracker | ChannelzCallTrackerStub; children: ChannelzChildren; } @@ -348,105 +330,102 @@ export interface SocketInfo { remoteFlowControlWindow: number | null; } -interface ChannelEntry { +type ChannelEntry = { ref: ChannelRef; getInfo(): ChannelInfo; -} +}; -interface SubchannelEntry { +type SubchannelEntry = { ref: SubchannelRef; getInfo(): SubchannelInfo; -} +}; -interface ServerEntry { +type ServerEntry = { ref: ServerRef; getInfo(): ServerInfo; -} +}; -interface SocketEntry { +type SocketEntry = { ref: SocketRef; getInfo(): SocketInfo; +}; + +export const enum EntityTypes { + channel = 'channel', + subchannel = 'subchannel', + server = 'server', + socket = 'socket', } -let nextId = 1; +const entityMaps = { + [EntityTypes.channel]: new OrderedMap(), + [EntityTypes.subchannel]: new OrderedMap(), + [EntityTypes.server]: new OrderedMap(), + [EntityTypes.socket]: new OrderedMap(), +} as const; -function getNextId(): number { - return nextId++; -} +export type RefByType = T extends EntityTypes.channel + ? ChannelRef + : T extends EntityTypes.server + ? ServerRef + : T extends EntityTypes.socket + ? SocketRef + : T extends EntityTypes.subchannel + ? SubchannelRef + : never; -const channels: (ChannelEntry | undefined)[] = []; -const subchannels: (SubchannelEntry | undefined)[] = []; -const servers: (ServerEntry | undefined)[] = []; -const sockets: (SocketEntry | undefined)[] = []; +export type EntryByType = T extends EntityTypes.channel + ? ChannelEntry + : T extends EntityTypes.server + ? ServerEntry + : T extends EntityTypes.socket + ? SocketEntry + : T extends EntityTypes.subchannel + ? SubchannelEntry + : never; -export function registerChannelzChannel( - name: string, - getInfo: () => ChannelInfo, - channelzEnabled: boolean -): ChannelRef { - const id = getNextId(); - const ref: ChannelRef = { id, name, kind: 'channel' }; - if (channelzEnabled) { - channels[id] = { ref, getInfo }; +export type InfoByType = T extends EntityTypes.channel + ? ChannelInfo + : T extends EntityTypes.subchannel + ? SubchannelInfo + : T extends EntityTypes.server + ? ServerInfo + : T extends EntityTypes.socket + ? SocketInfo + : never; + +const generateRegisterFn = (kind: R) => { + let nextId = 1; + function getNextId(): number { + return nextId++; } - return ref; -} -export function registerChannelzSubchannel( - name: string, - getInfo: () => SubchannelInfo, - channelzEnabled: boolean -): SubchannelRef { - const id = getNextId(); - const ref: SubchannelRef = { id, name, kind: 'subchannel' }; - if (channelzEnabled) { - subchannels[id] = { ref, getInfo }; - } - return ref; -} + return ( + name: string, + getInfo: () => InfoByType, + channelzEnabled: boolean + ): RefByType => { + const id = getNextId(); + const ref = { id, name, kind } as RefByType; + if (channelzEnabled) { + // @ts-expect-error typing issues + entityMaps[kind].setElement(id, { ref, getInfo }); + } + return ref; + }; +}; -export function registerChannelzServer( - getInfo: () => ServerInfo, - channelzEnabled: boolean -): ServerRef { - const id = getNextId(); - const ref: ServerRef = { id, kind: 'server' }; - if (channelzEnabled) { - servers[id] = { ref, getInfo }; - } - return ref; -} - -export function registerChannelzSocket( - name: string, - getInfo: () => SocketInfo, - channelzEnabled: boolean -): SocketRef { - const id = getNextId(); - const ref: SocketRef = { id, name, kind: 'socket' }; - if (channelzEnabled) { - sockets[id] = { ref, getInfo }; - } - return ref; -} +export const registerChannelzChannel = generateRegisterFn(EntityTypes.channel); +export const registerChannelzSubchannel = generateRegisterFn( + EntityTypes.subchannel +); +export const registerChannelzServer = generateRegisterFn(EntityTypes.server); +export const registerChannelzSocket = generateRegisterFn(EntityTypes.socket); export function unregisterChannelzRef( ref: ChannelRef | SubchannelRef | ServerRef | SocketRef ) { - switch (ref.kind) { - case 'channel': - delete channels[ref.id]; - return; - case 'subchannel': - delete subchannels[ref.id]; - return; - case 'server': - delete servers[ref.id]; - return; - case 'socket': - delete sockets[ref.id]; - return; - } + entityMaps[ref.kind].eraseElementByKey(ref.id); } /** @@ -556,6 +535,17 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null { function getChannelMessage(channelEntry: ChannelEntry): ChannelMessage { const resolvedInfo = channelEntry.getInfo(); + const channelRef: ChannelRefMessage[] = []; + const subchannelRef: SubchannelRefMessage[] = []; + + resolvedInfo.children.channels.forEach(el => { + channelRef.push(channelRefToMessage(el[1].ref)); + }); + + resolvedInfo.children.subchannels.forEach(el => { + subchannelRef.push(subchannelRefToMessage(el[1].ref)); + }); + return { ref: channelRefToMessage(channelEntry.ref), data: { @@ -569,12 +559,8 @@ function getChannelMessage(channelEntry: ChannelEntry): ChannelMessage { ), trace: resolvedInfo.trace.getTraceMessage(), }, - channel_ref: resolvedInfo.children.channels.map(ref => - channelRefToMessage(ref) - ), - subchannel_ref: resolvedInfo.children.subchannels.map(ref => - subchannelRefToMessage(ref) - ), + channel_ref: channelRef, + subchannel_ref: subchannelRef, }; } @@ -582,8 +568,9 @@ function GetChannel( call: ServerUnaryCall, callback: sendUnaryData ): void { - const channelId = Number.parseInt(call.request.channel_id); - const channelEntry = channels[channelId]; + const channelId = parseInt(call.request.channel_id, 10); + const channelEntry = + entityMaps[EntityTypes.channel].getElementByKey(channelId); if (channelEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -598,27 +585,34 @@ function GetTopChannels( call: ServerUnaryCall, callback: sendUnaryData ): void { - const maxResults = Number.parseInt(call.request.max_results); + const maxResults = parseInt(call.request.max_results, 10) || 100; const resultList: ChannelMessage[] = []; - let i = Number.parseInt(call.request.start_channel_id); - for (; i < channels.length; i++) { - const channelEntry = channels[i]; - if (channelEntry === undefined) { - continue; - } - resultList.push(getChannelMessage(channelEntry)); - if (resultList.length >= maxResults) { - break; - } + const startId = parseInt(call.request.start_channel_id, 10); + const channelEntries = entityMaps[EntityTypes.channel]; + + let i: OrderedMapIterator; + for ( + i = channelEntries.lowerBound(startId); + !i.equals(channelEntries.end()) && resultList.length < maxResults; + i = i.next() + ) { + resultList.push(getChannelMessage(i.pointer[1])); } + callback(null, { channel: resultList, - end: i >= servers.length, + end: i.equals(channelEntries.end()), }); } function getServerMessage(serverEntry: ServerEntry): ServerMessage { const resolvedInfo = serverEntry.getInfo(); + const listenSocket: SocketRefMessage[] = []; + + resolvedInfo.listenerChildren.sockets.forEach(el => { + listenSocket.push(socketRefToMessage(el[1].ref)); + }); + return { ref: serverRefToMessage(serverEntry.ref), data: { @@ -630,9 +624,7 @@ function getServerMessage(serverEntry: ServerEntry): ServerMessage { ), trace: resolvedInfo.trace.getTraceMessage(), }, - listen_socket: resolvedInfo.listenerChildren.sockets.map(ref => - socketRefToMessage(ref) - ), + listen_socket: listenSocket, }; } @@ -640,8 +632,9 @@ function GetServer( call: ServerUnaryCall, callback: sendUnaryData ): void { - const serverId = Number.parseInt(call.request.server_id); - const serverEntry = servers[serverId]; + const serverId = parseInt(call.request.server_id, 10); + const serverEntries = entityMaps[EntityTypes.server]; + const serverEntry = serverEntries.getElementByKey(serverId); if (serverEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -656,22 +649,23 @@ function GetServers( call: ServerUnaryCall, callback: sendUnaryData ): void { - const maxResults = Number.parseInt(call.request.max_results); + const maxResults = parseInt(call.request.max_results, 10) || 100; + const startId = parseInt(call.request.start_server_id, 10); + const serverEntries = entityMaps[EntityTypes.server]; const resultList: ServerMessage[] = []; - let i = Number.parseInt(call.request.start_server_id); - for (; i < servers.length; i++) { - const serverEntry = servers[i]; - if (serverEntry === undefined) { - continue; - } - resultList.push(getServerMessage(serverEntry)); - if (resultList.length >= maxResults) { - break; - } + + let i: OrderedMapIterator; + for ( + i = serverEntries.lowerBound(startId); + !i.equals(serverEntries.end()) && resultList.length < maxResults; + i = i.next() + ) { + resultList.push(getServerMessage(i.pointer[1])); } + callback(null, { server: resultList, - end: i >= servers.length, + end: i.equals(serverEntries.end()), }); } @@ -679,8 +673,9 @@ function GetSubchannel( call: ServerUnaryCall, callback: sendUnaryData ): void { - const subchannelId = Number.parseInt(call.request.subchannel_id); - const subchannelEntry = subchannels[subchannelId]; + const subchannelId = parseInt(call.request.subchannel_id, 10); + const subchannelEntry = + entityMaps[EntityTypes.subchannel].getElementByKey(subchannelId); if (subchannelEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -689,6 +684,12 @@ function GetSubchannel( return; } const resolvedInfo = subchannelEntry.getInfo(); + const listenSocket: SocketRefMessage[] = []; + + resolvedInfo.children.sockets.forEach(el => { + listenSocket.push(socketRefToMessage(el[1].ref)); + }); + const subchannelMessage: SubchannelMessage = { ref: subchannelRefToMessage(subchannelEntry.ref), data: { @@ -702,9 +703,7 @@ function GetSubchannel( ), trace: resolvedInfo.trace.getTraceMessage(), }, - socket_ref: resolvedInfo.children.sockets.map(ref => - socketRefToMessage(ref) - ), + socket_ref: listenSocket, }; callback(null, { subchannel: subchannelMessage }); } @@ -735,8 +734,8 @@ function GetSocket( call: ServerUnaryCall, callback: sendUnaryData ): void { - const socketId = Number.parseInt(call.request.socket_id); - const socketEntry = sockets[socketId]; + const socketId = parseInt(call.request.socket_id, 10); + const socketEntry = entityMaps[EntityTypes.socket].getElementByKey(socketId); if (socketEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -809,8 +808,9 @@ function GetServerSockets( >, callback: sendUnaryData ): void { - const serverId = Number.parseInt(call.request.server_id); - const serverEntry = servers[serverId]; + const serverId = parseInt(call.request.server_id, 10); + const serverEntry = entityMaps[EntityTypes.server].getElementByKey(serverId); + if (serverEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -818,28 +818,28 @@ function GetServerSockets( }); return; } - const startId = Number.parseInt(call.request.start_socket_id); - const maxResults = Number.parseInt(call.request.max_results); + + const startId = parseInt(call.request.start_socket_id, 10); + const maxResults = parseInt(call.request.max_results, 10) || 100; const resolvedInfo = serverEntry.getInfo(); // If we wanted to include listener sockets in the result, this line would // instead say // const allSockets = resolvedInfo.listenerChildren.sockets.concat(resolvedInfo.sessionChildren.sockets).sort((ref1, ref2) => ref1.id - ref2.id); - const allSockets = resolvedInfo.sessionChildren.sockets.sort( - (ref1, ref2) => ref1.id - ref2.id - ); + const allSockets = resolvedInfo.sessionChildren.sockets; const resultList: SocketRefMessage[] = []; - let i = 0; - for (; i < allSockets.length; i++) { - if (allSockets[i].id >= startId) { - resultList.push(socketRefToMessage(allSockets[i])); - if (resultList.length >= maxResults) { - break; - } - } + + let i: OrderedMapIterator; + for ( + i = allSockets.lowerBound(startId); + !i.equals(allSockets.end()) && resultList.length < maxResults; + i = i.next() + ) { + resultList.push(socketRefToMessage(i.pointer[1].ref)); } + callback(null, { socket_ref: resultList, - end: i >= allSockets.length, + end: i.equals(allSockets.end()), }); } diff --git a/packages/grpc-js/src/load-balancer-child-handler.ts b/packages/grpc-js/src/load-balancer-child-handler.ts index a29d6c92..352ea7b8 100644 --- a/packages/grpc-js/src/load-balancer-child-handler.ts +++ b/packages/grpc-js/src/load-balancer-child-handler.ts @@ -25,7 +25,7 @@ import { Endpoint, SubchannelAddress } from './subchannel-address'; import { ChannelOptions } from './channel-options'; import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; -import { ChannelRef, SubchannelRef } from './channelz'; +import type { ChannelRef, SubchannelRef } from './channelz'; import { SubchannelInterface } from './subchannel-interface'; const TYPE_NAME = 'child_load_balancer_helper'; diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index f8071317..fb353a59 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -19,7 +19,7 @@ import { ChannelOptions } from './channel-options'; import { Endpoint, SubchannelAddress } from './subchannel-address'; import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; -import { ChannelRef, SubchannelRef } from './channelz'; +import type { ChannelRef, SubchannelRef } from './channelz'; import { SubchannelInterface } from './subchannel-interface'; import { LoadBalancingConfig } from './service-config'; import { log } from './logging'; diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 95393fba..d5aababf 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -19,12 +19,12 @@ import { EventEmitter } from 'events'; import { Duplex, Readable, Writable } from 'stream'; import { Status } from './constants'; -import { Deserialize, Serialize } from './make-client'; +import type { Deserialize, Serialize } from './make-client'; import { Metadata } from './metadata'; -import { ObjectReadable, ObjectWritable } from './object-stream'; -import { StatusObject, PartialStatusObject } from './call-interface'; -import { Deadline } from './deadline'; -import { ServerInterceptingCallInterface } from './server-interceptors'; +import type { ObjectReadable, ObjectWritable } from './object-stream'; +import type { StatusObject, PartialStatusObject } from './call-interface'; +import type { Deadline } from './deadline'; +import type { ServerInterceptingCallInterface } from './server-interceptors'; export type ServerStatusResponse = Partial; @@ -330,7 +330,7 @@ export interface UnaryHandler { func: handleUnaryCall; serialize: Serialize; deserialize: Deserialize; - type: HandlerType; + type: 'unary'; path: string; } @@ -338,7 +338,7 @@ export interface ClientStreamingHandler { func: handleClientStreamingCall; serialize: Serialize; deserialize: Deserialize; - type: HandlerType; + type: 'clientStream'; path: string; } @@ -346,7 +346,7 @@ export interface ServerStreamingHandler { func: handleServerStreamingCall; serialize: Serialize; deserialize: Deserialize; - type: HandlerType; + type: 'serverStream'; path: string; } @@ -354,7 +354,7 @@ export interface BidiStreamingHandler { func: handleBidiStreamingCall; serialize: Serialize; deserialize: Deserialize; - type: HandlerType; + type: 'bidi'; path: string; } diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 46bd22ea..0a5bc0e9 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -64,8 +64,11 @@ import { } from './uri-parser'; import { ChannelzCallTracker, + ChannelzCallTrackerStub, ChannelzChildrenTracker, + ChannelzChildrenTrackerStub, ChannelzTrace, + ChannelzTraceStub, registerChannelzServer, registerChannelzSocket, ServerInfo, @@ -87,6 +90,7 @@ import { CallEventTracker } from './transport'; const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31); const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); const KEEPALIVE_TIMEOUT_MS = 20000; +const MAX_CONNECTION_IDLE_MS = 30 * 60 * 1e3; // 30 min const { HTTP2_HEADER_PATH } = http2.constants; @@ -177,9 +181,10 @@ function getDefaultHandler(handlerType: HandlerType, methodName: string) { interface ChannelzSessionInfo { ref: SocketRef; - streamTracker: ChannelzCallTracker; + streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub; messagesSent: number; messagesReceived: number; + keepAlivesSent: number; lastMessageSentTimestamp: Date | null; lastMessageReceivedTimestamp: Date | null; } @@ -243,6 +248,13 @@ export interface ServerOptions extends ChannelOptions { export class Server { private boundPorts: Map = new Map(); private http2Servers: Map = new Map(); + private sessionIdleTimeouts = new Map< + http2.Http2Session, + { + activeStreams: number; + timeout: NodeJS.Timeout | null; + } + >(); private handlers: Map = new Map< string, @@ -261,10 +273,14 @@ export class Server { // Channelz Info private readonly channelzEnabled: boolean = true; private channelzRef: ServerRef; - private channelzTrace = new ChannelzTrace(); - private callTracker = new ChannelzCallTracker(); - private listenerChildrenTracker = new ChannelzChildrenTracker(); - private sessionChildrenTracker = new ChannelzChildrenTracker(); + private channelzTrace: ChannelzTrace | ChannelzTraceStub; + private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub; + private listenerChildrenTracker: + | ChannelzChildrenTracker + | ChannelzChildrenTrackerStub; + private sessionChildrenTracker: + | ChannelzChildrenTracker + | ChannelzChildrenTrackerStub; private readonly maxConnectionAgeMs: number; private readonly maxConnectionAgeGraceMs: number; @@ -272,6 +288,8 @@ export class Server { private readonly keepaliveTimeMs: number; private readonly keepaliveTimeoutMs: number; + private readonly sessionIdleTimeout: number; + private readonly interceptors: ServerInterceptor[]; /** @@ -284,14 +302,24 @@ export class Server { this.options = options ?? {}; if (this.options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; + this.channelzTrace = new ChannelzTraceStub(); + this.callTracker = new ChannelzCallTrackerStub(); + this.listenerChildrenTracker = new ChannelzChildrenTrackerStub(); + this.sessionChildrenTracker = new ChannelzChildrenTrackerStub(); + } else { + this.channelzTrace = new ChannelzTrace(); + this.callTracker = new ChannelzCallTracker(); + this.listenerChildrenTracker = new ChannelzChildrenTracker(); + this.sessionChildrenTracker = new ChannelzChildrenTracker(); } + this.channelzRef = registerChannelzServer( + 'server', () => this.getChannelzInfo(), this.channelzEnabled ); - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Server created'); - } + + this.channelzTrace.addTrace('CT_INFO', 'Server created'); this.maxConnectionAgeMs = this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS; this.maxConnectionAgeGraceMs = @@ -301,6 +329,9 @@ export class Server { this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS; this.keepaliveTimeoutMs = this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS; + this.sessionIdleTimeout = + this.options['grpc.max_connection_idle'] ?? MAX_CONNECTION_IDLE_MS; + this.commonServerOptions = { maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER, }; @@ -382,7 +413,7 @@ export class Server { streamsFailed: sessionInfo.streamTracker.callsFailed, messagesSent: sessionInfo.messagesSent, messagesReceived: sessionInfo.messagesReceived, - keepAlivesSent: 0, + keepAlivesSent: sessionInfo.keepAlivesSent, lastLocalStreamCreatedTimestamp: null, lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp, @@ -581,9 +612,8 @@ export class Server { const channelzRef = this.registerListenerToChannelz( boundSubchannelAddress ); - if (this.channelzEnabled) { - this.listenerChildrenTracker.refChild(channelzRef); - } + this.listenerChildrenTracker.refChild(channelzRef); + this.http2Servers.set(http2Server, { channelzRef: channelzRef, sessions: new Set(), @@ -854,7 +884,7 @@ export class Server { ); const serverInfo = this.http2Servers.get(server); server.close(() => { - if (this.channelzEnabled && serverInfo) { + if (serverInfo) { this.listenerChildrenTracker.unrefChild(serverInfo.channelzRef); unregisterChannelzRef(serverInfo.channelzRef); } @@ -870,15 +900,15 @@ export class Server { this.trace('Closing session initiated by ' + session.socket?.remoteAddress); const sessionInfo = this.sessions.get(session); const closeCallback = () => { - if (this.channelzEnabled && sessionInfo) { + if (sessionInfo) { this.sessionChildrenTracker.unrefChild(sessionInfo.ref); unregisterChannelzRef(sessionInfo.ref); + this.sessions.delete(session); } - this.sessions.delete(session); callback?.(); }; if (session.closed) { - process.nextTick(closeCallback); + queueMicrotask(closeCallback); } else { session.close(closeCallback); } @@ -956,14 +986,13 @@ export class Server { const allSessions: Set = new Set(); for (const http2Server of boundPortObject.listeningServers) { const serverEntry = this.http2Servers.get(http2Server); - if (!serverEntry) { - continue; - } - for (const session of serverEntry.sessions) { - allSessions.add(session); - this.closeSession(session, () => { - allSessions.delete(session); - }); + if (serverEntry) { + for (const session of serverEntry.sessions) { + allSessions.add(session); + this.closeSession(session, () => { + allSessions.delete(session); + }); + } } } /* After the grace time ends, send another goaway to all remaining sessions @@ -995,9 +1024,7 @@ export class Server { session.destroy(http2.constants.NGHTTP2_CANCEL as any); }); this.sessions.clear(); - if (this.channelzEnabled) { - unregisterChannelzRef(this.channelzRef); - } + unregisterChannelzRef(this.channelzRef); this.shutdown = true; } @@ -1049,9 +1076,7 @@ export class Server { tryShutdown(callback: (error?: Error) => void): void { const wrappedCallback = (error?: Error) => { - if (this.channelzEnabled) { - unregisterChannelzRef(this.channelzRef); - } + unregisterChannelzRef(this.channelzRef); callback(error); }; let pendingChecks = 0; @@ -1065,24 +1090,26 @@ export class Server { } this.shutdown = true; - for (const server of this.http2Servers.keys()) { + for (const [serverKey, server] of this.http2Servers.entries()) { pendingChecks++; - const serverString = this.http2Servers.get(server)!.channelzRef.name; + const serverString = server.channelzRef.name; this.trace('Waiting for server ' + serverString + ' to close'); - this.closeServer(server, () => { + this.closeServer(serverKey, () => { this.trace('Server ' + serverString + ' finished closing'); maybeCallback(); }); + + for (const session of server.sessions.keys()) { + pendingChecks++; + const sessionString = session.socket?.remoteAddress; + this.trace('Waiting for session ' + sessionString + ' to close'); + this.closeSession(session, () => { + this.trace('Session ' + sessionString + ' finished closing'); + maybeCallback(); + }); + } } - for (const session of this.sessions.keys()) { - pendingChecks++; - const sessionString = session.socket?.remoteAddress; - this.trace('Waiting for session ' + sessionString + ' to close'); - this.closeSession(session, () => { - this.trace('Session ' + sessionString + ' finished closing'); - maybeCallback(); - }); - } + if (pendingChecks === 0) { wrappedCallback(); } @@ -1160,142 +1187,395 @@ export class Server { }; stream.respond(trailersToSend, { endStream: true }); - if (this.channelzEnabled) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); - } + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); } - private _channelzHandler( - stream: http2.ServerHttp2Stream, - headers: http2.IncomingHttpHeaders + private _sessionHandler( + http2Server: http2.Http2Server | http2.Http2SecureServer ) { - const channelzSessionInfo = this.sessions.get( - stream.session as http2.ServerHttp2Session - ); + return (session: http2.ServerHttp2Session) => { + this.http2Servers.get(http2Server)?.sessions.add(session); - this.callTracker.addCallStarted(); - channelzSessionInfo?.streamTracker.addCallStarted(); + let connectionAgeTimer: NodeJS.Timeout | null = null; + let connectionAgeGraceTimer: NodeJS.Timeout | null = null; + let sessionClosedByServer = false; - if (!this._verifyContentType(stream, headers)) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); - return; - } + const idleTimeoutObj = this.enableIdleTimeout(session); - const path = headers[HTTP2_HEADER_PATH] as string; + if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { + // Apply a random jitter within a +/-10% range + const jitterMagnitude = this.maxConnectionAgeMs / 10; + const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude; - const handler = this._retrieveHandler(path); - if (!handler) { - this._respondWithError( - getUnimplementedStatusResponse(path), - stream, - channelzSessionInfo - ); - return; - } + connectionAgeTimer = setTimeout(() => { + sessionClosedByServer = true; - const callEventTracker: CallEventTracker = { - addMessageSent: () => { - if (channelzSessionInfo) { - channelzSessionInfo.messagesSent += 1; - channelzSessionInfo.lastMessageSentTimestamp = new Date(); - } - }, - addMessageReceived: () => { - if (channelzSessionInfo) { - channelzSessionInfo.messagesReceived += 1; - channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); - } - }, - onCallEnd: status => { - if (status.code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }, - onStreamEnd: success => { - if (channelzSessionInfo) { - if (success) { - channelzSessionInfo.streamTracker.addCallSucceeded(); - } else { - channelzSessionInfo.streamTracker.addCallFailed(); + this.trace( + `Connection dropped by max connection age: ${session.socket?.remoteAddress}` + ); + + try { + session.goaway( + http2.constants.NGHTTP2_NO_ERROR, + ~(1 << 31), + Buffer.from('max_age') + ); + } catch (e) { + // The goaway can't be sent because the session is already closed + session.destroy(); + return; } + session.close(); + + /* Allow a grace period after sending the GOAWAY before forcibly + * closing the connection. */ + if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { + connectionAgeGraceTimer = setTimeout(() => { + session.destroy(); + }, this.maxConnectionAgeGraceMs).unref?.(); + } + }, this.maxConnectionAgeMs + jitter).unref?.(); + } + + const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => { + const timeoutTImer = setTimeout(() => { + sessionClosedByServer = true; + session.close(); + }, this.keepaliveTimeoutMs).unref?.(); + + try { + session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(timeoutTImer); + + if (err) { + sessionClosedByServer = true; + this.trace( + `Connection dropped due to error of a ping frame ${err.message} return in ${duration}` + ); + session.close(); + } + } + ); + } catch (e) { + // The ping can't be sent because the session is already closed + session.destroy(); } - }, - }; + }, this.keepaliveTimeMs).unref?.(); - const call = getServerInterceptingCall( - this.interceptors, - stream, - headers, - callEventTracker, - handler, - this.options - ); + session.on('close', () => { + if (!sessionClosedByServer) { + this.trace( + `Connection dropped by client ${session.socket?.remoteAddress}` + ); + } - if (!this._runHandlerForCall(call, handler)) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); + if (connectionAgeTimer) { + clearTimeout(connectionAgeTimer); + } - call.sendStatus({ - code: Status.INTERNAL, - details: `Unknown handler type: ${handler.type}`, + if (connectionAgeGraceTimer) { + clearTimeout(connectionAgeGraceTimer); + } + + if (keeapliveTimeTimer) { + clearTimeout(keeapliveTimeTimer); + } + + clearTimeout(idleTimeoutObj.timeout); + this.sessionIdleTimeouts.delete(session); + + this.http2Servers.get(http2Server)?.sessions.delete(session); }); - } + }; } - private _streamHandler( - stream: http2.ServerHttp2Stream, - headers: http2.IncomingHttpHeaders + private _channelzSessionHandler( + http2Server: http2.Http2Server | http2.Http2SecureServer ) { - if (this._verifyContentType(stream, headers) !== true) { - return; - } - - const path = headers[HTTP2_HEADER_PATH] as string; - - const handler = this._retrieveHandler(path); - if (!handler) { - this._respondWithError( - getUnimplementedStatusResponse(path), - stream, - null + return (session: http2.ServerHttp2Session) => { + const channelzRef = registerChannelzSocket( + session.socket?.remoteAddress ?? 'unknown', + this.getChannelzSessionInfoGetter(session), + this.channelzEnabled ); - return; - } - const call = getServerInterceptingCall( - this.interceptors, - stream, - headers, - null, - handler, - this.options - ); + const channelzSessionInfo: ChannelzSessionInfo = { + ref: channelzRef, + streamTracker: this.channelzEnabled + ? new ChannelzCallTracker() + : new ChannelzCallTrackerStub(), + messagesSent: 0, + messagesReceived: 0, + keepAlivesSent: 0, + lastMessageSentTimestamp: null, + lastMessageReceivedTimestamp: null, + }; - if (!this._runHandlerForCall(call, handler)) { - call.sendStatus({ - code: Status.INTERNAL, - details: `Unknown handler type: ${handler.type}`, + this.http2Servers.get(http2Server)?.sessions.add(session); + this.sessions.set(session, channelzSessionInfo); + const clientAddress = `${session.socket.remoteAddress}:${session.socket.remotePort}`; + + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection established by client ' + clientAddress + ); + this.trace('Connection established by client ' + clientAddress); + this.sessionChildrenTracker.refChild(channelzRef); + + let connectionAgeTimer: NodeJS.Timeout | null = null; + let connectionAgeGraceTimer: NodeJS.Timeout | null = null; + let sessionClosedByServer = false; + + const idleTimeoutObj = this.enableIdleTimeout(session); + + if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { + // Apply a random jitter within a +/-10% range + const jitterMagnitude = this.maxConnectionAgeMs / 10; + const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude; + + connectionAgeTimer = setTimeout(() => { + sessionClosedByServer = true; + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by max connection age from ' + clientAddress + ); + + try { + session.goaway( + http2.constants.NGHTTP2_NO_ERROR, + ~(1 << 31), + Buffer.from('max_age') + ); + } catch (e) { + // The goaway can't be sent because the session is already closed + session.destroy(); + return; + } + session.close(); + + /* Allow a grace period after sending the GOAWAY before forcibly + * closing the connection. */ + if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { + connectionAgeGraceTimer = setTimeout(() => { + session.destroy(); + }, this.maxConnectionAgeGraceMs).unref?.(); + } + }, this.maxConnectionAgeMs + jitter).unref?.(); + } + + const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => { + const timeoutTImer = setTimeout(() => { + sessionClosedByServer = true; + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by keepalive timeout from ' + clientAddress + ); + + session.close(); + }, this.keepaliveTimeoutMs).unref?.(); + try { + session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(timeoutTImer); + + if (err) { + sessionClosedByServer = true; + this.channelzTrace.addTrace( + 'CT_INFO', + `Connection dropped due to error of a ping frame ${err.message} return in ${duration}` + ); + + session.close(); + } + } + ); + channelzSessionInfo.keepAlivesSent += 1; + } catch (e) { + // The ping can't be sent because the session is already closed + session.destroy(); + } + }, this.keepaliveTimeMs).unref?.(); + + session.on('close', () => { + if (!sessionClosedByServer) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by client ' + clientAddress + ); + } + this.trace( + `DROPPING ${channelzRef.name} - ${channelzRef.kind} - ${channelzRef.id}` + ); + this.sessionChildrenTracker.unrefChild(channelzRef); + unregisterChannelzRef(channelzRef); + + if (connectionAgeTimer) { + clearTimeout(connectionAgeTimer); + } + + if (connectionAgeGraceTimer) { + clearTimeout(connectionAgeGraceTimer); + } + + if (keeapliveTimeTimer) { + clearTimeout(keeapliveTimeTimer); + } + + clearTimeout(idleTimeoutObj.timeout); + this.sessionIdleTimeouts.delete(session); + + this.http2Servers.get(http2Server)?.sessions.delete(session); + this.sessions.delete(session); }); - } + }; + } + + private _channelzHandler(http2Server: http2.Http2Server) { + return ( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) => { + // for handling idle timeout + this.onStreamOpened(stream); + + const channelzSessionInfo = this.sessions.get( + stream.session as http2.ServerHttp2Session + ); + + this.callTracker.addCallStarted(); + channelzSessionInfo?.streamTracker.addCallStarted(); + + if (!this._verifyContentType(stream, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + return; + } + + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), + stream, + channelzSessionInfo + ); + return; + } + + const callEventTracker: CallEventTracker = { + addMessageSent: () => { + if (channelzSessionInfo) { + channelzSessionInfo.messagesSent += 1; + channelzSessionInfo.lastMessageSentTimestamp = new Date(); + } + }, + addMessageReceived: () => { + if (channelzSessionInfo) { + channelzSessionInfo.messagesReceived += 1; + channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); + } + }, + onCallEnd: status => { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }, + onStreamEnd: success => { + if (channelzSessionInfo) { + if (success) { + channelzSessionInfo.streamTracker.addCallSucceeded(); + } else { + channelzSessionInfo.streamTracker.addCallFailed(); + } + } + }, + }; + + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + callEventTracker, + handler, + this.options + ); + + if (!this._runHandlerForCall(call, handler)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + + call.sendStatus({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } + }; + } + + private _streamHandler(http2Server: http2.Http2Server) { + return ( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) => { + // for handling idle timeout + this.onStreamOpened(stream); + + if (this._verifyContentType(stream, headers) !== true) { + return; + } + + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), + stream, + null + ); + return; + } + + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + null, + handler, + this.options + ); + + if (!this._runHandlerForCall(call, handler)) { + call.sendStatus({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } + }; } private _runHandlerForCall( call: ServerInterceptingCallInterface, - handler: Handler + handler: + | UntypedUnaryHandler + | UntypedClientStreamingHandler + | UntypedServerStreamingHandler + | UntypedBidiStreamingHandler ): boolean { const { type } = handler; if (type === 'unary') { - handleUnary(call, handler as UntypedUnaryHandler); + handleUnary(call, handler); } else if (type === 'clientStream') { - handleClientStreaming(call, handler as UntypedClientStreamingHandler); + handleClientStreaming(call, handler); } else if (type === 'serverStream') { - handleServerStreaming(call, handler as UntypedServerStreamingHandler); + handleServerStreaming(call, handler); } else if (type === 'bidi') { - handleBidiStreaming(call, handler as UntypedBidiStreamingHandler); + handleBidiStreaming(call, handler); } else { return false; } @@ -1322,118 +1602,78 @@ export class Server { this.serverAddressString = serverAddressString; const handler = this.channelzEnabled - ? this._channelzHandler - : this._streamHandler; + ? this._channelzHandler(http2Server) + : this._streamHandler(http2Server); - http2Server.on('stream', handler.bind(this)); - http2Server.on('session', session => { - const channelzRef = registerChannelzSocket( - session.socket.remoteAddress ?? 'unknown', - this.getChannelzSessionInfoGetter(session), - this.channelzEnabled + const sessionHandler = this.channelzEnabled + ? this._channelzSessionHandler(http2Server) + : this._sessionHandler(http2Server); + + http2Server.on('stream', handler); + http2Server.on('session', sessionHandler); + } + + private enableIdleTimeout(session: http2.ServerHttp2Session) { + const idleTimeoutObj = { + activeStreams: 0, + timeout: setTimeout( + this.onIdleTimeout, + this.sessionIdleTimeout, + this, + session + ).unref(), + }; + this.sessionIdleTimeouts.set(session, idleTimeoutObj); + + this.trace(`Enable idle timeout for ${session.socket?.remoteAddress}`); + + return idleTimeoutObj; + } + + private onIdleTimeout(ctx: Server, session: http2.ServerHttp2Session) { + ctx.trace(`Idle timeout for ${session.socket?.remoteAddress}`); + ctx.closeSession(session); + } + + private onStreamOpened(stream: http2.ServerHttp2Stream) { + const session = stream.session as http2.ServerHttp2Session; + this.trace(`Stream opened for ${session.socket?.remoteAddress}`); + const idleTimeoutObj = this.sessionIdleTimeouts.get(session); + if (idleTimeoutObj) { + idleTimeoutObj.activeStreams += 1; + if (idleTimeoutObj.timeout) { + clearTimeout(idleTimeoutObj.timeout); + idleTimeoutObj.timeout = null; + } + + this.trace( + `onStreamOpened: adding on stream close event for ${session.socket?.remoteAddress}` ); + stream.once('close', () => this.onStreamClose(session)); + } else { + this.trace( + `onStreamOpened: missing stream for ${session.socket?.remoteAddress}` + ); + } + } - const channelzSessionInfo: ChannelzSessionInfo = { - ref: channelzRef, - streamTracker: new ChannelzCallTracker(), - messagesSent: 0, - messagesReceived: 0, - lastMessageSentTimestamp: null, - lastMessageReceivedTimestamp: null, - }; - - this.http2Servers.get(http2Server)?.sessions.add(session); - this.sessions.set(session, channelzSessionInfo); - const clientAddress = session.socket.remoteAddress; - if (this.channelzEnabled) { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection established by client ' + clientAddress + private onStreamClose(session: http2.ServerHttp2Session) { + this.trace(`Stream closed for ${session.socket?.remoteAddress}`); + const idleTimeoutObj = this.sessionIdleTimeouts.get(session); + if (idleTimeoutObj) { + idleTimeoutObj.activeStreams -= 1; + if (idleTimeoutObj.activeStreams === 0) { + this.trace( + `onStreamClose: set idle timeout for ${this.sessionIdleTimeout}ms ${session.socket?.remoteAddress}` ); - this.sessionChildrenTracker.refChild(channelzRef); + idleTimeoutObj.timeout = setTimeout( + this.onIdleTimeout, + this.sessionIdleTimeout, + this, + session + ).unref(); } - let connectionAgeTimer: NodeJS.Timeout | null = null; - let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let sessionClosedByServer = false; - if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { - // Apply a random jitter within a +/-10% range - const jitterMagnitude = this.maxConnectionAgeMs / 10; - const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude; - connectionAgeTimer = setTimeout(() => { - sessionClosedByServer = true; - if (this.channelzEnabled) { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by max connection age from ' + clientAddress - ); - } - try { - session.goaway( - http2.constants.NGHTTP2_NO_ERROR, - ~(1 << 31), - Buffer.from('max_age') - ); - } catch (e) { - // The goaway can't be sent because the session is already closed - session.destroy(); - return; - } - session.close(); - /* Allow a grace period after sending the GOAWAY before forcibly - * closing the connection. */ - if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { - connectionAgeGraceTimer = setTimeout(() => { - session.destroy(); - }, this.maxConnectionAgeGraceMs).unref?.(); - } - }, this.maxConnectionAgeMs + jitter).unref?.(); - } - const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => { - const timeoutTImer = setTimeout(() => { - sessionClosedByServer = true; - if (this.channelzEnabled) { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by keepalive timeout from ' + clientAddress - ); - } - session.close(); - }, this.keepaliveTimeoutMs).unref?.(); - try { - session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(timeoutTImer); - } - ); - } catch (e) { - // The ping can't be sent because the session is already closed - session.destroy(); - } - }, this.keepaliveTimeMs).unref?.(); - session.on('close', () => { - if (this.channelzEnabled) { - if (!sessionClosedByServer) { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by client ' + clientAddress - ); - } - this.sessionChildrenTracker.unrefChild(channelzRef); - unregisterChannelzRef(channelzRef); - } - if (connectionAgeTimer) { - clearTimeout(connectionAgeTimer); - } - if (connectionAgeGraceTimer) { - clearTimeout(connectionAgeGraceTimer); - } - if (keeapliveTimeTimer) { - clearTimeout(keeapliveTimeTimer); - } - this.http2Servers.get(http2Server)?.sessions.delete(session); - this.sessions.delete(session); - }); - }); + } } } diff --git a/packages/grpc-js/src/subchannel-interface.ts b/packages/grpc-js/src/subchannel-interface.ts index c26669ba..6c314189 100644 --- a/packages/grpc-js/src/subchannel-interface.ts +++ b/packages/grpc-js/src/subchannel-interface.ts @@ -15,7 +15,7 @@ * */ -import { SubchannelRef } from './channelz'; +import type { SubchannelRef } from './channelz'; import { ConnectivityState } from './connectivity-state'; import { Subchannel } from './subchannel'; diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 63e254cf..95b600c4 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -31,10 +31,13 @@ import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, + ChannelzChildrenTrackerStub, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, + ChannelzCallTrackerStub, unregisterChannelzRef, + ChannelzTraceStub, } from './channelz'; import { ConnectivityStateListener, @@ -89,12 +92,15 @@ export class Subchannel { // Channelz info private readonly channelzEnabled: boolean = true; private channelzRef: SubchannelRef; - private channelzTrace: ChannelzTrace; - private callTracker = new ChannelzCallTracker(); - private childrenTracker = new ChannelzChildrenTracker(); + + private channelzTrace: ChannelzTrace | ChannelzTraceStub; + private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub; + private childrenTracker: + | ChannelzChildrenTracker + | ChannelzChildrenTrackerStub; // Channelz socket info - private streamTracker = new ChannelzCallTracker(); + private streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub; /** * A class representing a connection to a single backend. @@ -127,16 +133,24 @@ export class Subchannel { if (options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; + this.channelzTrace = new ChannelzTraceStub(); + this.callTracker = new ChannelzCallTrackerStub(); + this.childrenTracker = new ChannelzChildrenTrackerStub(); + this.streamTracker = new ChannelzCallTrackerStub(); + } else { + this.channelzTrace = new ChannelzTrace(); + this.callTracker = new ChannelzCallTracker(); + this.childrenTracker = new ChannelzChildrenTracker(); + this.streamTracker = new ChannelzCallTracker(); } - this.channelzTrace = new ChannelzTrace(); + this.channelzRef = registerChannelzSubchannel( this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled ); - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Subchannel created'); - } + + this.channelzTrace.addTrace('CT_INFO', 'Subchannel created'); this.trace( 'Subchannel constructed with options ' + JSON.stringify(options, undefined, 2) @@ -338,12 +352,8 @@ export class Subchannel { this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount - 1)); this.refcount -= 1; if (this.refcount === 0) { - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); - } - if (this.channelzEnabled) { - unregisterChannelzRef(this.channelzRef); - } + this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); + unregisterChannelzRef(this.channelzRef); process.nextTick(() => { this.transitionToState( [ConnectivityState.CONNECTING, ConnectivityState.READY], diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index c4941b06..62048863 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -28,6 +28,7 @@ import { ChannelCredentials } from './channel-credentials'; import { ChannelOptions } from './channel-options'; import { ChannelzCallTracker, + ChannelzCallTrackerStub, registerChannelzSocket, SocketInfo, SocketRef, @@ -136,7 +137,7 @@ class Http2Transport implements Transport { // Channelz info private channelzRef: SocketRef; private readonly channelzEnabled: boolean = true; - private streamTracker = new ChannelzCallTracker(); + private streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub; private keepalivesSent = 0; private messagesSent = 0; private messagesReceived = 0; @@ -159,12 +160,17 @@ class Http2Transport implements Transport { if (options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; + this.streamTracker = new ChannelzCallTrackerStub(); + } else { + this.streamTracker = new ChannelzCallTracker(); } + this.channelzRef = registerChannelzSocket( this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled ); + // Build user-agent string. this.userAgent = [ options['grpc.primary_user_agent'], @@ -192,6 +198,7 @@ class Http2Transport implements Transport { this.stopKeepalivePings(); this.handleDisconnect(); }); + session.once( 'goaway', (errorCode: number, lastStreamID: number, opaqueData?: Buffer) => { @@ -214,11 +221,13 @@ class Http2Transport implements Transport { this.reportDisconnectToOwner(tooManyPings); } ); + session.once('error', error => { /* Do nothing here. Any error should also trigger a close event, which is * where we want to handle that. */ this.trace('connection closed with error ' + (error as Error).message); }); + if (logging.isTracerEnabled(TRACER_NAME)) { session.on('remoteSettings', (settings: http2.Settings) => { this.trace( @@ -237,6 +246,7 @@ class Http2Transport implements Transport { ); }); } + /* Start the keepalive timer last, because this can trigger trace logs, * which should only happen after everything else is set up. */ if (this.keepaliveWithoutCalls) { @@ -625,6 +635,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { private session: http2.ClientHttp2Session | null = null; private isShutdown = false; constructor(private channelTarget: GrpcUri) {} + private trace(text: string) { logging.trace( LogVerbosity.DEBUG, @@ -632,6 +643,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { uriToString(this.channelTarget) + ' ' + text ); } + private createSession( address: SubchannelAddress, credentials: ChannelCredentials, @@ -641,6 +653,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { if (this.isShutdown) { return Promise.reject(); } + return new Promise((resolve, reject) => { let remoteName: string | null; if (proxyConnectionResult.realTarget) { @@ -767,6 +780,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { }); }); } + connect( address: SubchannelAddress, credentials: ChannelCredentials, diff --git a/packages/grpc-js/test/common.ts b/packages/grpc-js/test/common.ts index 88aa129a..eaa701f1 100644 --- a/packages/grpc-js/test/common.ts +++ b/packages/grpc-js/test/common.ts @@ -31,7 +31,7 @@ import { HealthListener, SubchannelInterface, } from '../src/subchannel-interface'; -import { SubchannelRef } from '../src/channelz'; +import { EntityTypes, SubchannelRef } from '../src/channelz'; import { Subchannel } from '../src/subchannel'; import { ConnectivityState } from '../src/connectivity-state'; @@ -196,7 +196,7 @@ export class MockSubchannel implements SubchannelInterface { unref(): void {} getChannelzRef(): SubchannelRef { return { - kind: 'subchannel', + kind: EntityTypes.subchannel, id: -1, name: this.address, };