Merge pull request #2616 from murgatroid99/grpc-js_server_drain

grpc-js: Implement `Server#drain`
This commit is contained in:
Michael Lumish 2023-11-15 10:26:43 -08:00 committed by GitHub
commit f04db5b87a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 102 additions and 0 deletions

View File

@ -853,6 +853,50 @@ export class Server {
}
}
/**
* Gracefully close all connections associated with a previously bound port.
* After the grace time, forcefully close all remaining open connections.
*
* If port 0 was bound, only the actual bound port can be
* drained. For example, if bindAsync was called with "localhost:0" and the
* bound port result was 54321, it can be drained as "localhost:54321".
* @param port
* @param graceTimeMs
* @returns
*/
drain(port: string, graceTimeMs: number): void {
this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs);
const portUri = this.normalizePort(port);
const splitPort = splitHostPort(portUri.path);
if (splitPort?.port === 0) {
throw new Error('Cannot drain port 0');
}
const boundPortObject = this.boundPorts.get(uriToString(portUri));
if (!boundPortObject) {
return;
}
const allSessions: Set<http2.Http2Session> = new Set();
for (const http2Server of boundPortObject.listeningServers) {
const serverEntry = this.http2Servers.get(http2Server);
if (!serverEntry) {
continue;
}
for (const session of serverEntry.sessions) {
allSessions.add(session);
this.closeSession(session, () => {
allSessions.delete(session);
});
}
}
/* After the grace time ends, send another goaway to all remaining sessions
* with the CANCEL code. */
setTimeout(() => {
for (const session of allSessions) {
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
}
}, graceTimeMs).unref?.();
}
forceShutdown(): void {
for (const boundPortObject of this.boundPorts.values()) {
boundPortObject.cancelled = true;

View File

@ -228,6 +228,64 @@ describe('Server', () => {
});
});
describe.only('drain', () => {
let client: ServiceClient;
let portNumber: number;
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;
const serviceImplementation = {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
echoBidiStream(call: ServerDuplexStream<any, any>) {
call.on('data', data => {
call.write(data);
});
call.on('end', () => {
call.end();
});
},
};
beforeEach(done => {
server.addService(echoService.service, serviceImplementation);
server.bindAsync(
'localhost:0',
ServerCredentials.createInsecure(),
(err, port) => {
assert.ifError(err);
portNumber = port;
client = new echoService(
`localhost:${port}`,
grpc.credentials.createInsecure()
);
server.start();
done();
}
);
});
afterEach(done => {
client.close();
server.tryShutdown(done);
});
it('Should cancel open calls after the grace period ends', done => {
const call = client.echoBidiStream();
call.on('error', (error: ServiceError) => {
assert.strictEqual(error.code, grpc.status.CANCELLED);
done();
});
call.on('data', () => {
server.drain(`localhost:${portNumber!}`, 100);
});
call.write({value: 'abc'});
});
});
describe('start', () => {
let server: Server;