From 96e3dde23da0f785f77f1b50503665c603d1f59b Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 24 Sep 2019 10:25:08 -0700 Subject: [PATCH] Return UNAVAILABLE status on TCP disconnect --- packages/grpc-js/src/call-stream.ts | 16 +++++++++++++- packages/grpc-js/src/subchannel.ts | 33 +++++++++++++++++++++++------ 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 71458600..d7176c54 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -27,6 +27,7 @@ import { Metadata } from './metadata'; import { ObjectDuplex, WriteCallback } from './object-stream'; import { StreamDecoder } from './stream-decoder'; import { ChannelImplementation } from './channel'; +import { Subchannel } from './subchannel'; const { HTTP2_HEADER_STATUS, @@ -112,6 +113,9 @@ export class Http2CallStream extends Duplex implements Call { // This is populated (non-null) if and only if the call has ended private finalStatus: StatusObject | null = null; + private subchannel: Subchannel | null = null; + private disconnectListener: () => void; + constructor( private readonly methodName: string, private readonly channel: ChannelImplementation, @@ -122,6 +126,9 @@ export class Http2CallStream extends Duplex implements Call { super({ objectMode: true }); this.filterStack = filterStackFactory.createFilter(this); this.credentials = channelCallCredentials; + this.disconnectListener = () => { + this.endCall({code: Status.UNAVAILABLE, details: 'Connection dropped', metadata: new Metadata()}); + }; } /** @@ -142,6 +149,10 @@ export class Http2CallStream extends Duplex implements Call { process.nextTick(() => { this.emit('status', status); }); + if (this.subchannel) { + this.subchannel.callUnref(); + this.subchannel.removeDisconnectListener(this.disconnectListener); + } } } @@ -239,11 +250,14 @@ export class Http2CallStream extends Duplex implements Call { })(); } - attachHttp2Stream(stream: http2.ClientHttp2Stream): void { + attachHttp2Stream(stream: http2.ClientHttp2Stream, subchannel: Subchannel): void { if (this.finalStatus !== null) { stream.close(NGHTTP2_CANCEL); } else { this.http2Stream = stream; + this.subchannel = subchannel; + subchannel.addDisconnectListener(this.disconnectListener); + subchannel.callRef(); stream.on('response', (headers, flags) => { switch (headers[HTTP2_HEADER_STATUS]) { // TODO(murgatroid99): handle 100 and 101 diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 2683643b..65571586 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -84,6 +84,13 @@ export class Subchannel { */ private stateListeners: ConnectivityStateListener[] = []; + /** + * A list of listener functions that will be called when the underlying + * socket disconnects. Used for ending active calls with an UNAVAILABLE + * status. + */ + private disconnectListeners: (() => void)[] = []; + private backoffTimeout: BackoffTimeout; /** @@ -274,6 +281,11 @@ export class Subchannel { switch (newState) { case ConnectivityState.READY: this.stopBackoff(); + this.session!.socket.once('close', () => { + for (const listener of this.disconnectListeners) { + listener(); + } + }); break; case ConnectivityState.CONNECTING: this.startBackoff(); @@ -322,7 +334,7 @@ export class Subchannel { } } - private callRef() { + callRef() { if (this.callRefcount === 0) { if (this.session) { this.session.ref(); @@ -332,7 +344,7 @@ export class Subchannel { this.callRefcount += 1; } - private callUnref() { + callUnref() { this.callRefcount -= 1; if (this.callRefcount === 0) { if (this.session) { @@ -376,11 +388,7 @@ export class Subchannel { headers[HTTP2_HEADER_PATH] = callStream.getMethod(); headers[HTTP2_HEADER_TE] = 'trailers'; const http2Stream = this.session!.request(headers); - this.callRef(); - http2Stream.on('close', () => { - this.callUnref(); - }); - callStream.attachHttp2Stream(http2Stream); + callStream.attachHttp2Stream(http2Stream, this); } /** @@ -434,6 +442,17 @@ export class Subchannel { } } + addDisconnectListener(listener: () => void) { + this.disconnectListeners.push(listener); + } + + removeDisconnectListener(listener: () => void) { + const listenerIndex = this.disconnectListeners.indexOf(listener); + if (listenerIndex > -1) { + this.disconnectListeners.splice(listenerIndex, 1); + } + } + /** * Reset the backoff timeout, and immediately start connecting if in backoff. */