Return UNAVAILABLE status on TCP disconnect

This commit is contained in:
murgatroid99 2019-09-24 10:25:08 -07:00
parent 8780f49c0a
commit 96e3dde23d
2 changed files with 41 additions and 8 deletions

View File

@ -27,6 +27,7 @@ import { Metadata } from './metadata';
import { ObjectDuplex, WriteCallback } from './object-stream';
import { StreamDecoder } from './stream-decoder';
import { ChannelImplementation } from './channel';
import { Subchannel } from './subchannel';
const {
HTTP2_HEADER_STATUS,
@ -112,6 +113,9 @@ export class Http2CallStream extends Duplex implements Call {
// This is populated (non-null) if and only if the call has ended
private finalStatus: StatusObject | null = null;
private subchannel: Subchannel | null = null;
private disconnectListener: () => void;
constructor(
private readonly methodName: string,
private readonly channel: ChannelImplementation,
@ -122,6 +126,9 @@ export class Http2CallStream extends Duplex implements Call {
super({ objectMode: true });
this.filterStack = filterStackFactory.createFilter(this);
this.credentials = channelCallCredentials;
this.disconnectListener = () => {
this.endCall({code: Status.UNAVAILABLE, details: 'Connection dropped', metadata: new Metadata()});
};
}
/**
@ -142,6 +149,10 @@ export class Http2CallStream extends Duplex implements Call {
process.nextTick(() => {
this.emit('status', status);
});
if (this.subchannel) {
this.subchannel.callUnref();
this.subchannel.removeDisconnectListener(this.disconnectListener);
}
}
}
@ -239,11 +250,14 @@ export class Http2CallStream extends Duplex implements Call {
})();
}
attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
attachHttp2Stream(stream: http2.ClientHttp2Stream, subchannel: Subchannel): void {
if (this.finalStatus !== null) {
stream.close(NGHTTP2_CANCEL);
} else {
this.http2Stream = stream;
this.subchannel = subchannel;
subchannel.addDisconnectListener(this.disconnectListener);
subchannel.callRef();
stream.on('response', (headers, flags) => {
switch (headers[HTTP2_HEADER_STATUS]) {
// TODO(murgatroid99): handle 100 and 101

View File

@ -84,6 +84,13 @@ export class Subchannel {
*/
private stateListeners: ConnectivityStateListener[] = [];
/**
* A list of listener functions that will be called when the underlying
* socket disconnects. Used for ending active calls with an UNAVAILABLE
* status.
*/
private disconnectListeners: (() => void)[] = [];
private backoffTimeout: BackoffTimeout;
/**
@ -274,6 +281,11 @@ export class Subchannel {
switch (newState) {
case ConnectivityState.READY:
this.stopBackoff();
this.session!.socket.once('close', () => {
for (const listener of this.disconnectListeners) {
listener();
}
});
break;
case ConnectivityState.CONNECTING:
this.startBackoff();
@ -322,7 +334,7 @@ export class Subchannel {
}
}
private callRef() {
callRef() {
if (this.callRefcount === 0) {
if (this.session) {
this.session.ref();
@ -332,7 +344,7 @@ export class Subchannel {
this.callRefcount += 1;
}
private callUnref() {
callUnref() {
this.callRefcount -= 1;
if (this.callRefcount === 0) {
if (this.session) {
@ -376,11 +388,7 @@ export class Subchannel {
headers[HTTP2_HEADER_PATH] = callStream.getMethod();
headers[HTTP2_HEADER_TE] = 'trailers';
const http2Stream = this.session!.request(headers);
this.callRef();
http2Stream.on('close', () => {
this.callUnref();
});
callStream.attachHttp2Stream(http2Stream);
callStream.attachHttp2Stream(http2Stream, this);
}
/**
@ -434,6 +442,17 @@ export class Subchannel {
}
}
addDisconnectListener(listener: () => void) {
this.disconnectListeners.push(listener);
}
removeDisconnectListener(listener: () => void) {
const listenerIndex = this.disconnectListeners.indexOf(listener);
if (listenerIndex > -1) {
this.disconnectListeners.splice(listenerIndex, 1);
}
}
/**
* Reset the backoff timeout, and immediately start connecting if in backoff.
*/