From 74fdd85123acac606659cf07bb1d1bdebf836439 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 13 Sep 2021 10:58:12 -0700 Subject: [PATCH] Add channelz service implementation --- packages/grpc-js/src/channel.ts | 3 + packages/grpc-js/src/channelz.ts | 375 ++++++++++++++++++++++++++++- packages/grpc-js/src/server.ts | 14 +- packages/grpc-js/src/subchannel.ts | 5 +- 4 files changed, 387 insertions(+), 10 deletions(-) diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 4a6e8814..c5f7352e 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -163,6 +163,7 @@ export class ChannelImplementation implements Channel { private configSelector: ConfigSelector | null = null; // Channelz info + private originalTarget: string; private channelzRef: ChannelRef; private channelzTrace: ChannelzTrace; private callTracker = new ChannelzCallTracker(); @@ -196,6 +197,7 @@ export class ChannelImplementation implements Channel { ); } } + this.originalTarget = target; const originalTargetUri = parseUri(target); if (originalTargetUri === null) { throw new Error(`Could not parse target name "${target}"`); @@ -320,6 +322,7 @@ export class ChannelImplementation implements Channel { private getChannelzInfo(): ChannelInfo { return { + target: this.originalTarget, state: this.connectivityState, trace: this.channelzTrace, callTracker: this.callTracker, diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index 3edbd7bb..7965ad3c 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -17,7 +17,39 @@ import { isIPv4, isIPv6 } from "net"; import { ConnectivityState } from "./connectivity-state"; -import { SubchannelAddress } from "./subchannel-address"; +import { Status } from "./constants"; +import { Timestamp } from "./generated/google/protobuf/Timestamp"; +import { Channel as ChannelMessage } from "./generated/grpc/channelz/v1/Channel"; +import { ChannelConnectivityState__Output } from "./generated/grpc/channelz/v1/ChannelConnectivityState"; +import { ChannelRef as ChannelRefMessage } from "./generated/grpc/channelz/v1/ChannelRef"; +import { ChannelTrace } from "./generated/grpc/channelz/v1/ChannelTrace"; +import { GetChannelRequest__Output } from "./generated/grpc/channelz/v1/GetChannelRequest"; +import { GetChannelResponse } from "./generated/grpc/channelz/v1/GetChannelResponse"; +import { sendUnaryData, ServerUnaryCall } from "./server-call"; +import { ServerRef as ServerRefMessage } from "./generated/grpc/channelz/v1/ServerRef"; +import { SocketRef as SocketRefMessage } from "./generated/grpc/channelz/v1/SocketRef"; +import { isTcpSubchannelAddress, SubchannelAddress } from "./subchannel-address"; +import { SubchannelRef as SubchannelRefMessage } from "./generated/grpc/channelz/v1/SubchannelRef"; +import { GetServerRequest__Output } from "./generated/grpc/channelz/v1/GetServerRequest"; +import { GetServerResponse } from "./generated/grpc/channelz/v1/GetServerResponse"; +import { Server as ServerMessage } from "./generated/grpc/channelz/v1/Server"; +import { GetServersRequest__Output } from "./generated/grpc/channelz/v1/GetServersRequest"; +import { GetServersResponse } from "./generated/grpc/channelz/v1/GetServersResponse"; +import { GetTopChannelsRequest__Output } from "./generated/grpc/channelz/v1/GetTopChannelsRequest"; +import { GetTopChannelsResponse } from "./generated/grpc/channelz/v1/GetTopChannelsResponse"; +import { GetSubchannelRequest__Output } from "./generated/grpc/channelz/v1/GetSubchannelRequest"; +import { GetSubchannelResponse } from "./generated/grpc/channelz/v1/GetSubchannelResponse"; +import { Subchannel as SubchannelMessage } from "./generated/grpc/channelz/v1/Subchannel"; +import { GetSocketRequest__Output } from "./generated/grpc/channelz/v1/GetSocketRequest"; +import { GetSocketResponse } from "./generated/grpc/channelz/v1/GetSocketResponse"; +import { Socket as SocketMessage } from "./generated/grpc/channelz/v1/Socket"; +import { Address } from "./generated/grpc/channelz/v1/Address"; +import { Security } from "./generated/grpc/channelz/v1/Security"; +import { GetServerSocketsRequest__Output } from "./generated/grpc/channelz/v1/GetServerSocketsRequest"; +import { GetServerSocketsResponse } from "./generated/grpc/channelz/v1/GetServerSocketsResponse"; +import { ChannelzDefinition, ChannelzHandlers } from "./generated/grpc/channelz/v1/Channelz"; +import { ProtoGrpcType as ChannelzProtoGrpcType } from "./generated/channelz"; +import type { loadSync } from '@grpc/proto-loader'; export type TraceSeverity = 'CT_UNKNOWN' | 'CT_INFO' | 'CT_WARNING' | 'CT_ERROR'; @@ -44,6 +76,33 @@ export interface SocketRef { name: string; } +function channelRefToMessage(ref: ChannelRef): ChannelRefMessage { + return { + channel_id: ref.id, + name: ref.name + }; +} + +function subchannelRefToMessage(ref: SubchannelRef): SubchannelRefMessage { + return { + subchannel_id: ref.id, + name: ref.name + } +} + +function serverRefToMessage(ref: ServerRef): ServerRefMessage { + return { + server_id: ref.id + } +} + +function socketRefToMessage(ref: SocketRef): SocketRefMessage { + return { + socket_id: ref.id, + name: ref.name + } +} + interface TraceEvent { description: string; severity: TraceSeverity; @@ -72,6 +131,22 @@ export class ChannelzTrace { }); this.eventsLogged += 1; } + + getTraceMessage(): ChannelTrace { + return { + creation_timestamp: dateToProtoTimestamp(this.creationTimestamp), + num_events_logged: this.eventsLogged, + events: this.events.map(event => { + return { + description: event.description, + severity: event.severity, + timestamp: dateToProtoTimestamp(event.timestamp), + channel_ref: event.childChannel ? channelRefToMessage(event.childChannel) : null, + subchannel_ref: event.childSubchannel ? subchannelRefToMessage(event.childSubchannel) : null + } + }) + }; + } } export class ChannelzChildrenTracker { @@ -185,6 +260,7 @@ export interface ChannelzChildren { } export interface ChannelInfo { + target: string; state: ConnectivityState; trace: ChannelzTrace; callTracker: ChannelzCallTracker; @@ -196,7 +272,8 @@ export interface SubchannelInfo extends ChannelInfo {} export interface ServerInfo { trace: ChannelzTrace; callTracker: ChannelzCallTracker; - children: ChannelzChildren; + listenerChildren: ChannelzChildren; + sessionChildren: ChannelzChildren; } export interface TlsInfo { @@ -342,4 +419,298 @@ function ipAddressStringToBuffer(ipAddress: string): Buffer | null { } else { return null; } +} + +function connectivityStateToMessage(state: ConnectivityState): ChannelConnectivityState__Output { + switch (state) { + case ConnectivityState.CONNECTING: + return { + state: 'CONNECTING' + }; + case ConnectivityState.IDLE: + return { + state: 'IDLE' + }; + case ConnectivityState.READY: + return { + state: 'READY' + }; + case ConnectivityState.SHUTDOWN: + return { + state: 'SHUTDOWN' + }; + case ConnectivityState.TRANSIENT_FAILURE: + return { + state: 'TRANSIENT_FAILURE' + }; + default: + return { + state: 'UNKNOWN' + }; + } +} + +function dateToProtoTimestamp(date?: Date | null): Timestamp | null { + if (!date) { + return null; + } + return { + seconds: date.getSeconds(), + nanos: date.getMilliseconds() * 1_000_000 + } +} + +function getChannelMessage(channelEntry: ChannelEntry): ChannelMessage { + const resolvedInfo = channelEntry.getInfo(); + return { + ref: channelRefToMessage(channelEntry.ref), + data: { + target: resolvedInfo.target, + state: connectivityStateToMessage(resolvedInfo.state), + calls_started: resolvedInfo.callTracker.callsStarted, + calls_succeeded: resolvedInfo.callTracker.callsSucceeded, + calls_failed: resolvedInfo.callTracker.callsFailed, + last_call_started_timestamp: dateToProtoTimestamp(resolvedInfo.callTracker.lastCallStartedTimestamp), + trace: resolvedInfo.trace.getTraceMessage() + }, + channel_ref: resolvedInfo.children.channels.map(ref => channelRefToMessage(ref)), + subchannel_ref: resolvedInfo.children.subchannels.map(ref => subchannelRefToMessage(ref)) + }; +} + +function GetChannel(call: ServerUnaryCall, callback: sendUnaryData): void { + const channelId = Number.parseInt(call.request.channel_id); + const channelEntry = channels[channelId]; + if (channelEntry === undefined) { + callback({ + 'code': Status.NOT_FOUND, + 'details': 'No channel data found for id ' + channelId + }); + return; + } + callback(null, {channel: getChannelMessage(channelEntry)}); +} + +function GetTopChannels(call: ServerUnaryCall, callback: sendUnaryData): void { + const maxResults = Number.parseInt(call.request.max_results); + const resultList: ChannelMessage[] = []; + let i = Number.parseInt(call.request.start_channel_id); + for (; i < channels.length; i++) { + const channelEntry = channels[i]; + if (channelEntry === undefined) { + continue; + } + resultList.push(getChannelMessage(channelEntry)); + if (resultList.length >= maxResults) { + break; + } + } + callback(null, { + channel: resultList, + end: i >= servers.length + }); +} + +function getServerMessage(serverEntry: ServerEntry): ServerMessage { + const resolvedInfo = serverEntry.getInfo(); + return { + ref: serverRefToMessage(serverEntry.ref), + data: { + calls_started: resolvedInfo.callTracker.callsStarted, + calls_succeeded: resolvedInfo.callTracker.callsSucceeded, + calls_failed: resolvedInfo.callTracker.callsFailed, + last_call_started_timestamp: dateToProtoTimestamp(resolvedInfo.callTracker.lastCallStartedTimestamp), + trace: resolvedInfo.trace.getTraceMessage() + }, + listen_socket: resolvedInfo.listenerChildren.sockets.map(ref => socketRefToMessage(ref)) + }; +} + +function GetServer(call: ServerUnaryCall, callback: sendUnaryData): void { + const serverId = Number.parseInt(call.request.server_id); + const serverEntry = servers[serverId]; + if (serverEntry === undefined) { + callback({ + 'code': Status.NOT_FOUND, + 'details': 'No server data found for id ' + serverId + }); + return; + } + callback(null, {server: getServerMessage(serverEntry)}); +} + +function GetServers(call: ServerUnaryCall, callback: sendUnaryData): void { + const maxResults = Number.parseInt(call.request.max_results); + const resultList: ServerMessage[] = []; + let i = Number.parseInt(call.request.start_server_id); + for (; i < servers.length; i++) { + const serverEntry = servers[i]; + if (serverEntry === undefined) { + continue; + } + resultList.push(getServerMessage(serverEntry)); + if (resultList.length >= maxResults) { + break; + } + } + callback(null, { + server: resultList, + end: i >= servers.length + }); +} + +function GetSubchannel(call: ServerUnaryCall, callback: sendUnaryData): void { + const subchannelId = Number.parseInt(call.request.subchannel_id); + const subchannelEntry = subchannels[subchannelId]; + if (subchannelEntry === undefined) { + callback({ + 'code': Status.NOT_FOUND, + 'details': 'No subchannel data found for id ' + subchannelId + }); + return; + } + const resolvedInfo = subchannelEntry.getInfo(); + const subchannelMessage: SubchannelMessage = { + ref: subchannelRefToMessage(subchannelEntry.ref), + data: { + target: resolvedInfo.target, + state: connectivityStateToMessage(resolvedInfo.state), + calls_started: resolvedInfo.callTracker.callsStarted, + calls_succeeded: resolvedInfo.callTracker.callsSucceeded, + calls_failed: resolvedInfo.callTracker.callsFailed, + last_call_started_timestamp: dateToProtoTimestamp(resolvedInfo.callTracker.lastCallStartedTimestamp), + trace: resolvedInfo.trace.getTraceMessage() + }, + socket_ref: resolvedInfo.children.sockets.map(ref => socketRefToMessage(ref)) + }; + callback(null, {subchannel: subchannelMessage}); +} + +function subchannelAddressToAddressMessage(subchannelAddress: SubchannelAddress): Address { + if (isTcpSubchannelAddress(subchannelAddress)) { + return { + address: 'tcpip_address', + tcpip_address: { + ip_address: ipAddressStringToBuffer(subchannelAddress.host) ?? undefined, + port: subchannelAddress.port + } + }; + } else { + return { + address: 'uds_address', + uds_address: { + filename: subchannelAddress.path + } + }; + } +} + +function GetSocket(call: ServerUnaryCall, callback: sendUnaryData): void { + const socketId = Number.parseInt(call.request.socket_id); + const socketEntry = sockets[socketId]; + if (socketEntry === undefined) { + callback({ + 'code': Status.NOT_FOUND, + 'details': 'No socket data found for id ' + socketId + }); + return; + } + const resolvedInfo = socketEntry.getInfo(); + const securityMessage: Security | null = resolvedInfo.security ? { + model: 'tls', + tls: { + cipher_suite: resolvedInfo.security.cipherSuiteStandardName ? 'standard_name' : 'other_name', + standard_name: resolvedInfo.security.cipherSuiteStandardName ?? undefined, + other_name: resolvedInfo.security.cipherSuiteOtherName ?? undefined, + local_certificate: resolvedInfo.security.localCertificate ?? undefined, + remote_certificate: resolvedInfo.security.remoteCertificate ?? undefined + } + } : null; + const socketMessage: SocketMessage = { + ref: socketRefToMessage(socketEntry.ref), + local: subchannelAddressToAddressMessage(resolvedInfo.localAddress), + remote: resolvedInfo.remoteAddress ? subchannelAddressToAddressMessage(resolvedInfo.remoteAddress) : null, + remote_name: resolvedInfo.remoteName ?? undefined, + security: securityMessage, + data: { + keep_alives_sent: resolvedInfo.keepAlivesSent, + streams_started: resolvedInfo.streamsStarted, + streams_succeeded: resolvedInfo.streamsSucceeded, + streams_failed: resolvedInfo.streamsFailed, + last_local_stream_created_timestamp: dateToProtoTimestamp(resolvedInfo.lastLocalStreamCreatedTimestamp), + last_remote_stream_created_timestamp: dateToProtoTimestamp(resolvedInfo.lastRemoteStreamCreatedTimestamp), + messages_received: resolvedInfo.messagesReceived, + messages_sent: resolvedInfo.messagesSent, + last_message_received_timestamp: dateToProtoTimestamp(resolvedInfo.lastMessageReceivedTimestamp), + last_message_sent_timestamp: dateToProtoTimestamp(resolvedInfo.lastMessageSentTimestamp), + local_flow_control_window: resolvedInfo.localFlowControlWindow ? { value: resolvedInfo.localFlowControlWindow } : null, + remote_flow_control_window: resolvedInfo.remoteFlowControlWindow ? { value: resolvedInfo.remoteFlowControlWindow } : null, + } + }; + callback(null, {socket: socketMessage}); +} + +function GetServerSockets(call: ServerUnaryCall, callback: sendUnaryData): void { + const serverId = Number.parseInt(call.request.server_id); + const serverEntry = servers[serverId]; + if (serverEntry === undefined) { + callback({ + 'code': Status.NOT_FOUND, + 'details': 'No server data found for id ' + serverId + }); + return; + } + const startId = Number.parseInt(call.request.start_socket_id); + const maxResults = Number.parseInt(call.request.max_results); + const resolvedInfo = serverEntry.getInfo(); + const allSockets = resolvedInfo.listenerChildren.sockets.concat(resolvedInfo.sessionChildren.sockets).sort((ref1, ref2) => ref1.id - ref2.id); + const resultList: SocketRefMessage[] = []; + let i = 0; + for (; i < allSockets.length; i++) { + if (allSockets[i].id >= startId) { + resultList.push(socketRefToMessage(allSockets[i])); + if (resultList.length >= maxResults) { + break; + } + } + } + callback(null, { + socket_ref: resultList, + end: i >= allSockets.length + }); +} + +export function getChannelzHandlers(): ChannelzHandlers { + return { + GetChannel, + GetTopChannels, + GetServer, + GetServers, + GetSubchannel, + GetSocket, + GetServerSockets + }; +} + +let loadedChannelzDefinition: ChannelzDefinition | null = null; + +export function getChannelzServiceDefinition(): ChannelzDefinition { + if (loadedChannelzDefinition) { + return loadedChannelzDefinition; + } + /* The purpose of this complexity is to avoid loading @grpc/proto-loader at + * runtime for users who will not use/enable channelz. */ + const loaderLoadSync = require('@grpc/proto-loader').loadSync as typeof loadSync; + const loadedProto = loaderLoadSync('channelz.proto', { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + includeDirs: [ + '../../proto' + ] + }) as unknown as ChannelzProtoGrpcType; + loadedChannelzDefinition = loadedProto.grpc.channelz.v1.Channelz.service; + return loadedChannelzDefinition; } \ No newline at end of file diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 04c24c07..ef31dadc 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -156,7 +156,8 @@ export class Server { private channelzRef: ServerRef; private channelzTrace = new ChannelzTrace(); private callTracker = new ChannelzCallTracker(); - private childrenTracker = new ChannelzChildrenTracker(); + private listenerChildrenTracker = new ChannelzChildrenTracker(); + private sessionChildrenTracker = new ChannelzChildrenTracker(); constructor(options?: ChannelOptions) { this.options = options ?? {}; @@ -168,7 +169,8 @@ export class Server { return { trace: this.channelzTrace, callTracker: this.callTracker, - children: this.childrenTracker.getChildLists() + listenerChildren: this.listenerChildrenTracker.getChildLists(), + sessionChildren: this.sessionChildrenTracker.getChildLists() }; } @@ -568,7 +570,7 @@ export class Server { for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) { if (http2Server.listening) { http2Server.close(() => { - this.childrenTracker.unrefChild(ref); + this.listenerChildrenTracker.unrefChild(ref); unregisterChannelzRef(ref); }); } @@ -652,7 +654,7 @@ export class Server { if (http2Server.listening) { pendingChecks++; http2Server.close(() => { - this.childrenTracker.unrefChild(ref); + this.listenerChildrenTracker.unrefChild(ref); unregisterChannelzRef(ref); maybeCallback(); }); @@ -826,10 +828,10 @@ export class Server { this.sessions.set(session, channelzSessionInfo); const clientAddress = session.socket.remoteAddress; this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress); - this.childrenTracker.refChild(channelzRef); + this.sessionChildrenTracker.refChild(channelzRef); session.on('close', () => { this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress); - this.childrenTracker.unrefChild(channelzRef); + this.sessionChildrenTracker.unrefChild(channelzRef); unregisterChannelzRef(channelzRef); this.sessions.delete(session); }); diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 1caf94ba..e7e7a9f0 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -266,7 +266,8 @@ export class Subchannel { state: this.connectivityState, trace: this.channelzTrace, callTracker: this.callTracker, - children: this.childrenTracker.getChildLists() + children: this.childrenTracker.getChildLists(), + target: this.subchannelAddressString }; } @@ -285,7 +286,7 @@ export class Subchannel { const peerCertificate = tlsSocket.getPeerCertificate(); tlsInfo = { cipherSuiteStandardName: cipherInfo.standardName ?? null, - cipherSuiteOtherName: cipherInfo.standardName ? cipherInfo.name: null, + cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null };