From cf665757d0ed764ecc75fc3b42f9b2dc2f3bca84 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 9 Dec 2019 13:53:04 -0800 Subject: [PATCH] grpc-js: Add low-level call tracers --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/call-stream.ts | 67 +++++++++++++++++++++++++++++ packages/grpc-js/src/channel.ts | 5 +++ packages/grpc-js/src/subchannel.ts | 28 ++++++++++++ 4 files changed, 101 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index c5574ebe..056f36a7 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "0.6.13", + "version": "0.6.14", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index d8655cf7..c2ddc4b7 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -28,6 +28,14 @@ import { ObjectDuplex, WriteCallback } from './object-stream'; import { StreamDecoder } from './stream-decoder'; import { ChannelImplementation } from './channel'; import { Subchannel } from './subchannel'; +import * as logging from './logging'; +import { LogVerbosity } from './constants'; + +const TRACER_NAME = 'call_stream'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} const { HTTP2_HEADER_STATUS, @@ -135,6 +143,16 @@ export class Http2CallStream extends Duplex implements Call { }; } + // tslint:disable-next-line:no-any + push(chunk: any, encoding?: string): boolean { + trace( + this.methodName + + 'pushing to reader message of length ' + + (chunk instanceof Buffer ? chunk.length : null) + ); + return super.push(chunk); + } + /** * On first call, emits a 'status' event with the given StatusObject. * Subsequent calls are no-ops. @@ -142,6 +160,14 @@ export class Http2CallStream extends Duplex implements Call { */ private endCall(status: StatusObject): void { if (this.finalStatus === null) { + trace( + this.methodName + + ' ended with status: code=' + + status.code + + ' details="' + + status.details + + '"' + ); this.finalStatus = status; /* We do this asynchronously to ensure that no async function is in the * call stack when we return control to the application. If an async @@ -177,6 +203,11 @@ export class Http2CallStream extends Duplex implements Call { (this.http2Stream as http2.ClientHttp2Stream).pause(); } } else { + trace( + this.methodName + + ' unpushedReadMessages.push message of length ' + + message.length + ); this.unpushedReadMessages.push(message); } if (this.unfilteredReadMessages.length > 0) { @@ -201,6 +232,11 @@ export class Http2CallStream extends Duplex implements Call { } return; } + trace( + this.methodName + + ' filterReceivedMessage of length ' + + framedMessage.length + ); this.isReadFilterPending = true; this.filterStack .receiveMessage(Promise.resolve(framedMessage)) @@ -212,6 +248,11 @@ export class Http2CallStream extends Duplex implements Call { private tryPush(messageBytes: Buffer | null): void { if (this.isReadFilterPending) { + trace( + this.methodName + + ' unfilteredReadMessages.push message of length ' + + (messageBytes && messageBytes.length) + ); this.unfilteredReadMessages.push(messageBytes); } else { this.filterReceivedMessage(messageBytes); @@ -219,6 +260,7 @@ export class Http2CallStream extends Duplex implements Call { } private handleTrailers(headers: http2.IncomingHttpHeaders) { + trace(this.methodName + ' received HTTP/2 trailing headers frame'); const code: Status = this.mappedStatusCode; const details = ''; let metadata: Metadata; @@ -261,11 +303,17 @@ export class Http2CallStream extends Duplex implements Call { if (this.finalStatus !== null) { stream.close(NGHTTP2_CANCEL); } else { + trace( + this.methodName + + ' attachHttp2Stream from subchannel ' + + subchannel.getAddress() + ); this.http2Stream = stream; this.subchannel = subchannel; subchannel.addDisconnectListener(this.disconnectListener); subchannel.callRef(); stream.on('response', (headers, flags) => { + trace(this.methodName + ' received HTTP/2 headers frame'); switch (headers[':status']) { // TODO(murgatroid99): handle 100 and 101 case 400: @@ -321,16 +369,28 @@ export class Http2CallStream extends Duplex implements Call { }); stream.on('trailers', this.handleTrailers.bind(this)); stream.on('data', (data: Buffer) => { + trace( + this.methodName + + ' receive HTTP/2 data frame of length ' + + data.length + ); const messages = this.decoder.write(data); for (const message of messages) { + trace( + this.methodName + ' parsed message of length ' + message.length + ); this.tryPush(message); } }); stream.on('end', () => { + trace(this.methodName + ' received HTTP/2 end of data flag'); this.tryPush(null); }); stream.on('close', async () => { + trace( + this.methodName + ' HTTP/2 stream closed with code ' + stream.rstCode + ); let code: Status; let details = ''; switch (stream.rstCode) { @@ -383,6 +443,7 @@ export class Http2CallStream extends Duplex implements Call { } sendMetadata(metadata: Metadata): void { + trace(this.methodName + ' Sending metadata'); this.channel._startCallStream(this, metadata); } @@ -461,6 +522,11 @@ export class Http2CallStream extends Duplex implements Call { } _write(chunk: WriteObject, encoding: string, cb: WriteCallback) { + trace( + this.methodName + + ' write() called with message of length ' + + chunk.message.length + ); this.filterStack.sendMessage(Promise.resolve(chunk)).then(message => { if (this.http2Stream === null) { this.pendingWrite = message.message; @@ -472,6 +538,7 @@ export class Http2CallStream extends Duplex implements Call { } _final(cb: Function) { + trace(this.methodName + ' end() called'); if (this.http2Stream === null) { this.pendingFinalCallback = cb; } else { diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 6a859ed2..8e45ccb4 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -357,6 +357,11 @@ export class ChannelImplementation implements Channel { if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } + trace( + LogVerbosity.DEBUG, + 'channel', + 'createCall(method="' + method + '", deadline=' + deadline + ')' + ); const finalOptions: CallStreamOptions = { deadline: deadline === null || deadline === undefined ? Infinity : deadline, diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index eae178e7..1d02591d 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -395,6 +395,13 @@ export class Subchannel { } callRef() { + trace( + this.subchannelAddress + + ' callRefcount ' + + this.callRefcount + + ' -> ' + + (this.callRefcount + 1) + ); if (this.callRefcount === 0) { if (this.session) { this.session.ref(); @@ -405,6 +412,13 @@ export class Subchannel { } callUnref() { + trace( + this.subchannelAddress + + ' callRefcount ' + + this.callRefcount + + ' -> ' + + (this.callRefcount - 1) + ); this.callRefcount -= 1; if (this.callRefcount === 0) { if (this.session) { @@ -416,10 +430,24 @@ export class Subchannel { } ref() { + trace( + this.subchannelAddress + + ' callRefcount ' + + this.refcount + + ' -> ' + + (this.refcount + 1) + ); this.refcount += 1; } unref() { + trace( + this.subchannelAddress + + ' callRefcount ' + + this.refcount + + ' -> ' + + (this.refcount - 1) + ); this.refcount -= 1; this.checkBothRefcounts(); }