diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index c2ddc4b7..c1e9b3b5 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -33,10 +33,6 @@ import { LogVerbosity } from './constants'; const TRACER_NAME = 'call_stream'; -function trace(text: string): void { - logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); -} - const { HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, @@ -129,7 +125,8 @@ export class Http2CallStream extends Duplex implements Call { private readonly channel: ChannelImplementation, private readonly options: CallStreamOptions, filterStackFactory: FilterStackFactory, - private readonly channelCallCredentials: CallCredentials + private readonly channelCallCredentials: CallCredentials, + private readonly callNumber: number ) { super({ objectMode: true }); this.filterStack = filterStackFactory.createFilter(this); @@ -143,11 +140,18 @@ export class Http2CallStream extends Duplex implements Call { }; } + private trace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + TRACER_NAME, + '[' + this.callNumber + '] ' + text + ); + } + // tslint:disable-next-line:no-any push(chunk: any, encoding?: string): boolean { - trace( - this.methodName + - 'pushing to reader message of length ' + + this.trace( + 'pushing to reader message of length ' + (chunk instanceof Buffer ? chunk.length : null) ); return super.push(chunk); @@ -160,9 +164,8 @@ export class Http2CallStream extends Duplex implements Call { */ private endCall(status: StatusObject): void { if (this.finalStatus === null) { - trace( - this.methodName + - ' ended with status: code=' + + this.trace( + 'ended with status: code=' + status.code + ' details="' + status.details + @@ -203,10 +206,8 @@ export class Http2CallStream extends Duplex implements Call { (this.http2Stream as http2.ClientHttp2Stream).pause(); } } else { - trace( - this.methodName + - ' unpushedReadMessages.push message of length ' + - message.length + this.trace( + 'unpushedReadMessages.push message of length ' + message.length ); this.unpushedReadMessages.push(message); } @@ -232,11 +233,7 @@ export class Http2CallStream extends Duplex implements Call { } return; } - trace( - this.methodName + - ' filterReceivedMessage of length ' + - framedMessage.length - ); + this.trace('filterReceivedMessage of length ' + framedMessage.length); this.isReadFilterPending = true; this.filterStack .receiveMessage(Promise.resolve(framedMessage)) @@ -248,9 +245,10 @@ export class Http2CallStream extends Duplex implements Call { private tryPush(messageBytes: Buffer | null): void { if (this.isReadFilterPending) { - trace( - this.methodName + - ' unfilteredReadMessages.push message of length ' + + this.trace( + '[' + + this.callNumber + + '] unfilteredReadMessages.push message of length ' + (messageBytes && messageBytes.length) ); this.unfilteredReadMessages.push(messageBytes); @@ -260,7 +258,7 @@ export class Http2CallStream extends Duplex implements Call { } private handleTrailers(headers: http2.IncomingHttpHeaders) { - trace(this.methodName + ' received HTTP/2 trailing headers frame'); + this.trace('received HTTP/2 trailing headers frame'); const code: Status = this.mappedStatusCode; const details = ''; let metadata: Metadata; @@ -303,17 +301,15 @@ export class Http2CallStream extends Duplex implements Call { if (this.finalStatus !== null) { stream.close(NGHTTP2_CANCEL); } else { - trace( - this.methodName + - ' attachHttp2Stream from subchannel ' + - subchannel.getAddress() + this.trace( + 'attachHttp2Stream from subchannel ' + subchannel.getAddress() ); this.http2Stream = stream; this.subchannel = subchannel; subchannel.addDisconnectListener(this.disconnectListener); subchannel.callRef(); stream.on('response', (headers, flags) => { - trace(this.methodName + ' received HTTP/2 headers frame'); + this.trace('received HTTP/2 headers frame'); switch (headers[':status']) { // TODO(murgatroid99): handle 100 and 101 case 400: @@ -369,28 +365,20 @@ export class Http2CallStream extends Duplex implements Call { }); stream.on('trailers', this.handleTrailers.bind(this)); stream.on('data', (data: Buffer) => { - trace( - this.methodName + - ' receive HTTP/2 data frame of length ' + - data.length - ); + this.trace('receive HTTP/2 data frame of length ' + data.length); const messages = this.decoder.write(data); for (const message of messages) { - trace( - this.methodName + ' parsed message of length ' + message.length - ); + this.trace('parsed message of length ' + message.length); this.tryPush(message); } }); stream.on('end', () => { - trace(this.methodName + ' received HTTP/2 end of data flag'); + this.trace('received HTTP/2 end of data flag'); this.tryPush(null); }); stream.on('close', async () => { - trace( - this.methodName + ' HTTP/2 stream closed with code ' + stream.rstCode - ); + this.trace('HTTP/2 stream closed with code ' + stream.rstCode); let code: Status; let details = ''; switch (stream.rstCode) { @@ -437,13 +425,14 @@ export class Http2CallStream extends Duplex implements Call { stream.write(this.pendingWrite, this.pendingWriteCallback); } if (this.pendingFinalCallback) { + this.trace('calling end() on HTTP/2 stream'); stream.end(this.pendingFinalCallback); } } } sendMetadata(metadata: Metadata): void { - trace(this.methodName + ' Sending metadata'); + this.trace('Sending metadata'); this.channel._startCallStream(this, metadata); } @@ -522,11 +511,7 @@ export class Http2CallStream extends Duplex implements Call { } _write(chunk: WriteObject, encoding: string, cb: WriteCallback) { - trace( - this.methodName + - ' write() called with message of length ' + - chunk.message.length - ); + this.trace('write() called with message of length ' + chunk.message.length); this.filterStack.sendMessage(Promise.resolve(chunk)).then(message => { if (this.http2Stream === null) { this.pendingWrite = message.message; @@ -538,10 +523,11 @@ export class Http2CallStream extends Duplex implements Call { } _final(cb: Function) { - trace(this.methodName + ' end() called'); + this.trace('end() called'); if (this.http2Stream === null) { this.pendingFinalCallback = cb; } else { + this.trace('calling end() on HTTP/2 stream'); this.http2Stream.end(cb); } } diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 8e45ccb4..a763e20d 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -47,6 +47,17 @@ export enum ConnectivityState { SHUTDOWN, } +let nextCallNumber = 0; + +function getNewCallNumber(): number { + const callNumber = nextCallNumber; + nextCallNumber += 1; + if (nextCallNumber >= Number.MAX_SAFE_INTEGER) { + nextCallNumber = 0; + } + return callNumber; +} + /** * An interface that represents a communication channel to a server specified * by a given address. @@ -357,10 +368,17 @@ export class ChannelImplementation implements Channel { if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } + const callNumber = getNewCallNumber(); trace( LogVerbosity.DEBUG, 'channel', - 'createCall(method="' + method + '", deadline=' + deadline + ')' + this.target + + ' createCall [' + + callNumber + + '] method="' + + method + + '", deadline=' + + deadline ); const finalOptions: CallStreamOptions = { deadline: @@ -374,7 +392,8 @@ export class ChannelImplementation implements Channel { this, finalOptions, this.filterStackFactory, - this.credentials._getCallCredentials() + this.credentials._getCallCredentials(), + callNumber ); return stream; }