grpc-js: Don't end calls when receiving GOAWAY

This commit is contained in:
Michael Lumish 2023-01-10 15:24:22 -08:00
parent 13337aaa47
commit b3b6310f04
3 changed files with 87 additions and 20 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.2",
"version": "1.8.3",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -161,7 +161,7 @@ class Http2Transport implements Transport {
session.once('close', () => {
this.trace('session closed');
this.stopKeepalivePings();
this.handleDisconnect(false);
this.handleDisconnect();
});
session.once('goaway', (errorCode: number, lastStreamID: number, opaqueData: Buffer) => {
let tooManyPings = false;
@ -177,7 +177,7 @@ class Http2Transport implements Transport {
'connection closed by GOAWAY with code ' +
errorCode
);
this.handleDisconnect(tooManyPings);
this.reportDisconnectToOwner(tooManyPings);
});
session.once('error', error => {
/* Do nothing here. Any error should also trigger a close event, which is
@ -263,15 +263,35 @@ class Http2Transport implements Transport {
logging.trace(LogVerbosity.DEBUG, 'transport_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private handleDisconnect(tooManyPings: boolean) {
/**
* Indicate to the owner of this object that this transport should no longer
* be used. That happens if the connection drops, or if the server sends a
* GOAWAY.
* @param tooManyPings If true, this was triggered by a GOAWAY with data
* indicating that the session was closed becaues the client sent too many
* pings.
* @returns
*/
private reportDisconnectToOwner(tooManyPings: boolean) {
if (this.disconnectHandled) {
return;
}
this.disconnectHandled = true;
this.disconnectListeners.forEach(listener => listener(tooManyPings));
for (const call of this.activeCalls) {
call.onDisconnect();
}
}
/**
* Handle connection drops, but not GOAWAYs.
*/
private handleDisconnect() {
this.reportDisconnectToOwner(false);
/* Give calls an event loop cycle to finish naturally before reporting the
* disconnnection to them. */
setImmediate(() => {
for (const call of this.activeCalls) {
call.onDisconnect();
}
});
}
addDisconnectListener(listener: TransportDisconnectListener): void {
@ -294,7 +314,7 @@ class Http2Transport implements Transport {
if (!this.keepaliveTimeoutId) {
this.keepaliveTimeoutId = setTimeout(() => {
this.keepaliveTrace('Ping timeout passed without response');
this.handleDisconnect(false);
this.handleDisconnect();
}, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.();
}
@ -308,7 +328,7 @@ class Http2Transport implements Transport {
} catch (e) {
/* If we fail to send a ping, the connection is no longer functional, so
* we should discard it. */
this.handleDisconnect(false);
this.handleDisconnect();
}
}
@ -365,7 +385,7 @@ class Http2Transport implements Transport {
try {
http2Stream = this.session!.request(headers);
} catch (e) {
this.handleDisconnect(false);
this.handleDisconnect();
throw e;
}
this.flowControlTrace(

View File

@ -27,9 +27,9 @@ import * as grpc from '../src';
import { Server, ServerCredentials } from '../src';
import { ServiceError } from '../src/call';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { sendUnaryData, ServerUnaryCall } from '../src/server-call';
import { sendUnaryData, ServerUnaryCall, ServerDuplexStream } from '../src/server-call';
import { loadProtoFile } from './common';
import { assert2, loadProtoFile } from './common';
import { TestServiceClient, TestServiceHandlers } from './generated/TestService';
import { ProtoGrpcType as TestServiceGrpcType } from './generated/test_service';
import { Request__Output } from './generated/Request';
@ -458,18 +458,28 @@ describe('Server', () => {
describe('Echo service', () => {
let server: Server;
let client: ServiceClient;
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;
const serviceImplementation = {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
echoBidiStream(call: ServerDuplexStream<any, any>) {
call.on('data', data => {
call.write(data);
});
call.on('end', () => {
call.end();
});
}
};
before(done => {
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;
server = new Server();
server.addService(echoService.service, {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
});
server.addService(echoService.service, serviceImplementation);
server.bindAsync(
'localhost:0',
@ -501,6 +511,43 @@ describe('Echo service', () => {
}
);
});
/* This test passes on Node 18 but fails on Node 16. The failure appears to
* be caused by https://github.com/nodejs/node/issues/42713 */
it.skip('should continue a stream after server shutdown', done => {
const server2 = new Server();
server2.addService(echoService.service, serviceImplementation);
server2.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => {
if (err) {
done(err);
return;
}
const client2 = new echoService(`localhost:${port}`, grpc.credentials.createInsecure());
server2.start();
const stream = client2.echoBidiStream();
const totalMessages = 5;
let messagesSent = 0;
stream.write({ value: 'test value', value2: messagesSent});
messagesSent += 1;
stream.on('data', () => {
if (messagesSent === 1) {
server2.tryShutdown(assert2.mustCall(() => {}));
}
if (messagesSent >= totalMessages) {
stream.end();
} else {
stream.write({ value: 'test value', value2: messagesSent});
messagesSent += 1;
}
});
stream.on('status', assert2.mustCall((status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(messagesSent, totalMessages);
}));
stream.on('error', () => {});
assert2.afterMustCallsSatisfied(done);
});
});
});
describe('Generic client and server', () => {