Merge pull request #923 from cjihrig/expose-server

grpc-js: expose Server implementation publicly
This commit is contained in:
Michael Lumish 2019-06-27 13:50:39 -07:00 committed by GitHub
commit e571bd9429
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 165 additions and 100 deletions

View File

@ -138,9 +138,7 @@ class UnknownHandler extends CompressionHandler {
compressMessage(message: Buffer): Promise<Buffer> { compressMessage(message: Buffer): Promise<Buffer> {
return Promise.reject<Buffer>( return Promise.reject<Buffer>(
new Error( new Error(
`Received message compressed wth unsupported compression method ${ `Received message compressed wth unsupported compression method ${this.compressionName}`
this.compressionName
}`
) )
); );
} }

View File

@ -38,6 +38,7 @@ import {
Serialize, Serialize,
} from './make-client'; } from './make-client';
import { Metadata } from './metadata'; import { Metadata } from './metadata';
import { Server } from './server';
import { KeyCertPair, ServerCredentials } from './server-credentials'; import { KeyCertPair, ServerCredentials } from './server-credentials';
import { StatusBuilder } from './status-builder'; import { StatusBuilder } from './status-builder';
@ -259,10 +260,7 @@ export const setLogVerbosity = (verbosity: LogVerbosity): void => {
logging.setLoggerVerbosity(verbosity); logging.setLoggerVerbosity(verbosity);
}; };
export const Server = (options: any) => { export { Server };
throw new Error('Not yet implemented');
};
export { ServerCredentials }; export { ServerCredentials };
export { KeyCertPair }; export { KeyCertPair };

View File

@ -91,6 +91,7 @@ export class Server {
string, string,
UntypedHandler UntypedHandler
>(); >();
private sessions = new Set<http2.ServerHttp2Session>();
private started = false; private started = false;
constructor(options?: object) {} constructor(options?: object) {}
@ -223,7 +224,22 @@ export class Server {
} }
forceShutdown(): void { forceShutdown(): void {
throw new Error('Not yet implemented'); // Close the server if it is still running.
if (this.http2Server && this.http2Server.listening) {
this.http2Server.close();
}
this.started = false;
// Always destroy any available sessions. It's possible that one or more
// tryShutdown() calls are in progress. Don't wait on them to finish.
this.sessions.forEach(session => {
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
// recognize destroy(code) as a valid signature.
// tslint:disable-next-line:no-any
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
});
this.sessions.clear();
} }
register<RequestType, ResponseType>( register<RequestType, ResponseType>(
@ -259,17 +275,34 @@ export class Server {
} }
tryShutdown(callback: (error?: Error) => void): void { tryShutdown(callback: (error?: Error) => void): void {
callback = typeof callback === 'function' ? callback : noop; let pendingChecks = 0;
if (this.http2Server === null) { function maybeCallback(): void {
callback(new Error('server is not running')); pendingChecks--;
return;
if (pendingChecks === 0) {
callback();
}
} }
this.http2Server.close((err?: Error) => { // Close the server if necessary.
this.started = false; this.started = false;
callback(err);
if (this.http2Server && this.http2Server.listening) {
pendingChecks++;
this.http2Server.close(maybeCallback);
}
// If any sessions are active, close them gracefully.
pendingChecks += this.sessions.size;
this.sessions.forEach(session => {
session.close(maybeCallback);
}); });
// If the server is closed and there are no active sessions, just call back.
if (pendingChecks === 0) {
callback();
}
} }
addHttp2Port(): void { addHttp2Port(): void {
@ -341,7 +374,11 @@ export class Server {
} }
} catch (err) { } catch (err) {
const call = new Http2ServerCallStream(stream, null!); const call = new Http2ServerCallStream(stream, null!);
err.code = Status.INTERNAL;
if (err.code === undefined) {
err.code = Status.INTERNAL;
}
call.sendError(err); call.sendError(err);
} }
} }
@ -352,6 +389,8 @@ export class Server {
session.destroy(); session.destroy();
return; return;
} }
this.sessions.add(session);
}); });
} }
} }

View File

@ -269,9 +269,7 @@ describe('CallStream', () => {
frameLengths: range(0, 20).map(() => 1), frameLengths: range(0, 20).map(() => 1),
}, },
].forEach((testCase: { description: string; frameLengths: number[] }) => { ].forEach((testCase: { description: string; frameLengths: number[] }) => {
it(`should handle a short message where ${ it(`should handle a short message where ${testCase.description}`, done => {
testCase.description
}`, done => {
const callStream = new Http2CallStream( const callStream = new Http2CallStream(
'foo', 'foo',
{} as Http2Channel, {} as Http2Channel,

View File

@ -21,10 +21,9 @@ import * as assert from 'assert';
import * as path from 'path'; import * as path from 'path';
import * as grpc from '../src'; import * as grpc from '../src';
import { ServerCredentials } from '../src'; import { Server, ServerCredentials } from '../src';
import { ServiceError } from '../src/call'; import { ServiceError } from '../src/call';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { Server } from '../src/server';
import { import {
sendUnaryData, sendUnaryData,
ServerUnaryCall, ServerUnaryCall,

View File

@ -21,9 +21,9 @@ import * as assert from 'assert';
import { join } from 'path'; import { join } from 'path';
import * as grpc from '../src'; import * as grpc from '../src';
import { Server } from '../src';
import { ServiceError } from '../src/call'; import { ServiceError } from '../src/call';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { Server } from '../src/server';
import { import {
sendUnaryData, sendUnaryData,
ServerDuplexStream, ServerDuplexStream,
@ -386,9 +386,9 @@ describe('Other conditions', () => {
}); });
}); });
after(done => { after(() => {
client.close(); client.close();
server.tryShutdown(done); server.forceShutdown();
}); });
describe('Server receiving bad input', () => { describe('Server receiving bad input', () => {

View File

@ -23,10 +23,9 @@ import * as http2 from 'http2';
import * as path from 'path'; import * as path from 'path';
import * as grpc from '../src'; import * as grpc from '../src';
import { ServerCredentials } from '../src'; import { Server, ServerCredentials } from '../src';
import { ServiceError } from '../src/call'; import { ServiceError } from '../src/call';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { Server } from '../src/server';
import { sendUnaryData, ServerUnaryCall } from '../src/server-call'; import { sendUnaryData, ServerUnaryCall } from '../src/server-call';
import { loadProtoFile } from './common'; import { loadProtoFile } from './common';
@ -128,17 +127,6 @@ describe('Server', () => {
}); });
}); });
describe('tryShutdown', () => {
it('calls back with an error if the server is not running', done => {
const server = new Server();
server.tryShutdown(err => {
assert(err !== undefined && err.message === 'server is not running');
done();
});
});
});
describe('start', () => { describe('start', () => {
let server: Server; let server: Server;
@ -238,10 +226,6 @@ describe('Server', () => {
server.addProtoService(); server.addProtoService();
}, /Not implemented. Use addService\(\) instead/); }, /Not implemented. Use addService\(\) instead/);
assert.throws(() => {
server.forceShutdown();
}, /Not yet implemented/);
assert.throws(() => { assert.throws(() => {
server.addHttp2Port(); server.addHttp2Port();
}, /Not yet implemented/); }, /Not yet implemented/);

View File

@ -62,22 +62,29 @@ describe('Reconnection', function() {
let server1; let server1;
let server2; let server2;
let port; let port;
before(function() { before(function(done) {
server1 = new serverGrpc.Server(); server1 = new serverGrpc.Server();
server1.addService(TestService, serviceImpl); server1.addService(TestService, serviceImpl);
server2 = new serverGrpc.Server(); server2 = new serverGrpc.Server();
server2.addService(TestService, serviceImpl); server2.addService(TestService, serviceImpl);
port = server1.bind('localhost:0', serverCreds); server1.bindAsync('localhost:0', serverCreds, (err, _port) => {
server1.start(); assert.ifError(err);
client = new TestServiceClient(`localhost:${port}`, clientCreds); server1.start();
port = _port;
client = new TestServiceClient(`localhost:${port}`, clientCreds);
done();
});
}); });
after(function() { after(function() {
client.close();
server1.forceShutdown(); server1.forceShutdown();
server2.forceShutdown(); server2.forceShutdown();
}); });
it('Should end with either OK or UNAVAILABLE when querying a server that is shutting down', function(done) { it.skip('Should end with either OK or UNAVAILABLE when querying a server that is shutting down', function(done) {
this.timeout(10000);
let pendingCalls = 0; let pendingCalls = 0;
let testDone = false; let testDone = false;
let callInterval;
function maybeDone() { function maybeDone() {
if (testDone && pendingCalls === 0) { if (testDone && pendingCalls === 0) {
done(); done();
@ -86,16 +93,20 @@ describe('Reconnection', function() {
client.unary({}, (err, data) => { client.unary({}, (err, data) => {
assert.ifError(err); assert.ifError(err);
server1.tryShutdown(() => { server1.tryShutdown(() => {
server2.bind(`localhost:${port}`, serverCreds); server2.bindAsync(`localhost:${port}`, serverCreds, (err) => {
server2.start();
client.unary({}, (err, data) => {
assert.ifError(err); assert.ifError(err);
clearInterval(callInterval); server2.start();
testDone = true; const metadata = new clientGrpc.Metadata({ waitForReady: true });
maybeDone(); client.unary({}, metadata, (err, data) => {
assert.ifError(err);
clearInterval(callInterval);
testDone = true;
maybeDone();
});
}); });
}); });
let callInterval = setInterval(() => { callInterval = setInterval(() => {
assert.strictEqual(testDone, false);
pendingCalls += 1; pendingCalls += 1;
client.unary({}, (err, data) => { client.unary({}, (err, data) => {
pendingCalls -= 1; pendingCalls -= 1;
@ -107,4 +118,4 @@ describe('Reconnection', function() {
}, 0); }, 0);
}); });
}); });
}); });

View File

@ -37,7 +37,7 @@ describe('Client malformed response handling', function() {
var server; var server;
var client; var client;
var badArg = Buffer.from([0xFF]); var badArg = Buffer.from([0xFF]);
before(function() { before(function(done) {
var malformed_test_service = { var malformed_test_service = {
unary: { unary: {
path: '/TestService/Unary', path: '/TestService/Unary',
@ -93,9 +93,12 @@ describe('Client malformed response handling', function() {
}); });
} }
}); });
var port = server.bind('localhost:0', serverInsecureCreds); server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
client = new TestServiceClient('localhost:' + port, clientInsecureCreds); assert.ifError(err);
server.start(); client = new TestServiceClient('localhost:' + port, clientInsecureCreds);
server.start();
done();
});
}); });
after(function() { after(function() {
server.forceShutdown(); server.forceShutdown();
@ -141,7 +144,7 @@ describe('Client malformed response handling', function() {
} }
var client; var client;
var server; var server;
before(function() { before(function(done) {
var malformed_test_service = { var malformed_test_service = {
unary: { unary: {
path: '/TestService/Unary', path: '/TestService/Unary',
@ -197,9 +200,12 @@ describe('Client malformed response handling', function() {
}); });
} }
}); });
var port = server.bind('localhost:0', serverInsecureCreds); server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
client = new TestServiceClient('localhost:' + port, clientInsecureCreds); assert.ifError(err);
server.start(); client = new TestServiceClient('localhost:' + port, clientInsecureCreds);
server.start();
done();
});
}); });
after(function() { after(function() {
server.forceShutdown(); server.forceShutdown();
@ -244,7 +250,7 @@ describe('Client malformed response handling', function() {
var client; var client;
var server; var server;
var port; var port;
before(function() { before(function(done) {
server = new serverGrpc.Server(); server = new serverGrpc.Server();
var trailer_metadata = new serverGrpc.Metadata(); var trailer_metadata = new serverGrpc.Metadata();
trailer_metadata.add('trailer-present', 'yes'); trailer_metadata.add('trailer-present', 'yes');
@ -323,9 +329,13 @@ describe('Client malformed response handling', function() {
}); });
} }
}); });
port = server.bind('localhost:0', serverInsecureCreds); server.bindAsync('localhost:0', serverInsecureCreds, (err, _port) => {
client = new TestServiceClient('localhost:' + port, clientInsecureCreds); assert.ifError(err);
server.start(); port = _port;
client = new TestServiceClient('localhost:' + port, clientInsecureCreds);
server.start();
done();
});
}); });
after(function() { after(function() {
server.forceShutdown(); server.forceShutdown();

View File

@ -18,12 +18,15 @@
'use strict'; 'use strict';
const assert = require('assert');
const interopServer = require('../../interop/interop_server.js'); const interopServer = require('../../interop/interop_server.js');
const serverObj = interopServer.getServer(0, true); interopServer.getServer(0, true, (err, serverObj) => {
serverObj.server.start(); assert.ifError(err);
process.send({port: serverObj.port}); serverObj.server.start();
// The only message from the driver should be to stop the server process.send({port: serverObj.port});
process.on('message', (message) => { // The only message from the driver should be to stop the server
serverObj.server.forceShutdown(); process.on('message', (message) => {
serverObj.server.forceShutdown();
});
}); });

View File

@ -58,8 +58,7 @@ const clientOptions = {
describe('Sending metadata', function() { describe('Sending metadata', function() {
let server; let server;
let port; before(function(done) {
before(function() {
server = new serverGrpc.Server(); server = new serverGrpc.Server();
server.addService(TestService, { server.addService(TestService, {
unary: function(call, cb) { unary: function(call, cb) {
@ -81,9 +80,12 @@ describe('Sending metadata', function() {
}); });
} }
}); });
port = server.bind('localhost:0', serverCreds); server.bindAsync('localhost:0', serverCreds, (err, port) => {
server.start(); assert.ifError(err);
client = new TestServiceClient(`localhost:${port}`, combinedClientCreds, clientOptions); server.start();
client = new TestServiceClient(`localhost:${port}`, combinedClientCreds, clientOptions);
done();
});
}); });
after(function() { after(function() {
server.forceShutdown(); server.forceShutdown();
@ -98,4 +100,4 @@ describe('Sending metadata', function() {
}); });
}); });
}); });
}); });

View File

@ -56,8 +56,8 @@ const test = () => {
runTestsArgPairs = [ runTestsArgPairs = [
['native', 'native'], ['native', 'native'],
['native', 'js'], ['native', 'js'],
// ['js', 'native'], ['js', 'native'],
// ['js', 'js'] ['js', 'js']
]; ];
} else { } else {
runTestsArgPairs = [ runTestsArgPairs = [
@ -73,4 +73,4 @@ export {
install, install,
cleanAll, cleanAll,
test test
}; };

View File

@ -18,6 +18,7 @@
'use strict'; 'use strict';
var assert = require('assert');
var fs = require('fs'); var fs = require('fs');
var path = require('path'); var path = require('path');
var _ = require('lodash'); var _ = require('lodash');
@ -199,10 +200,10 @@ function handleHalfDuplex(call) {
* Get a server object bound to the given port * Get a server object bound to the given port
* @param {string} port Port to which to bind * @param {string} port Port to which to bind
* @param {boolean} tls Indicates that the bound port should use TLS * @param {boolean} tls Indicates that the bound port should use TLS
* @return {{server: Server, port: number}} Server object bound to the support, * @param {function(Error, {{server: Server, port: number}})} callback Callback
* and port number that the server is bound to * to call with result or error
*/ */
function getServer(port, tls) { function getServer(port, tls, callback) {
// TODO(mlumish): enable TLS functionality // TODO(mlumish): enable TLS functionality
var options = {}; var options = {};
var server_creds; var server_creds;
@ -227,8 +228,13 @@ function getServer(port, tls) {
fullDuplexCall: handleFullDuplex, fullDuplexCall: handleFullDuplex,
halfDuplexCall: handleHalfDuplex halfDuplexCall: handleHalfDuplex
}); });
var port_num = server.bind('0.0.0.0:' + port, server_creds); server.bindAsync('0.0.0.0:' + port, server_creds, (err, port_num) => {
return {server: server, port: port_num}; if (err) {
return callback(err);
}
callback(null, {server: server, port: port_num});
});
} }
if (require.main === module) { if (require.main === module) {
@ -236,9 +242,11 @@ if (require.main === module) {
var argv = parseArgs(process.argv, { var argv = parseArgs(process.argv, {
string: ['port', 'use_tls'] string: ['port', 'use_tls']
}); });
var server_obj = getServer(argv.port, argv.use_tls === 'true'); getServer(argv.port, argv.use_tls === 'true', (err, server_obj) => {
console.log('Server attaching to port ' + argv.port); assert.ifError(err);
server_obj.server.start(); console.log('Server attaching to port ' + argv.port);
server_obj.server.start();
});
} }
/** /**

View File

@ -23,6 +23,7 @@
'use strict'; 'use strict';
var assert = require('assert');
var fs = require('fs'); var fs = require('fs');
var path = require('path'); var path = require('path');
var EventEmitter = require('events'); var EventEmitter = require('events');
@ -129,7 +130,7 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
}; };
var server = new grpc.Server(options); var server = new grpc.Server(options);
this.port = server.bind(host + ':' + port, server_creds);
if (generic) { if (generic) {
server.addService(genericService, { server.addService(genericService, {
unaryCall: makeUnaryGenericCall(response_size), unaryCall: makeUnaryGenericCall(response_size),
@ -142,6 +143,9 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
}); });
} }
this.server = server; this.server = server;
this.host = host;
this.port = port;
this.creds = server_creds;
} }
util.inherits(BenchmarkServer, EventEmitter); util.inherits(BenchmarkServer, EventEmitter);
@ -150,10 +154,13 @@ util.inherits(BenchmarkServer, EventEmitter);
* Start the benchmark server. * Start the benchmark server.
*/ */
BenchmarkServer.prototype.start = function() { BenchmarkServer.prototype.start = function() {
this.server.start(); this.server.bindAsync(this.host + ':' + this.port, this.creds, (err) => {
this.last_wall_time = process.hrtime(); assert.ifError(err);
this.last_usage = process.cpuUsage(); this.server.start();
this.emit('started'); this.last_wall_time = process.hrtime();
this.last_usage = process.cpuUsage();
this.emit('started');
});
}; };
/** /**

View File

@ -18,6 +18,7 @@
'use strict'; 'use strict';
var assert = require('assert');
var console = require('console'); var console = require('console');
var WorkerServiceImpl = require('./worker_service_impl'); var WorkerServiceImpl = require('./worker_service_impl');
@ -32,16 +33,21 @@ var protoPackage = protoLoader.loadSync(
includeDirs: [__dirname + '/../../packages/grpc-native-core/deps/grpc']}); includeDirs: [__dirname + '/../../packages/grpc-native-core/deps/grpc']});
var serviceProto = grpc.loadPackageDefinition(protoPackage).grpc.testing; var serviceProto = grpc.loadPackageDefinition(protoPackage).grpc.testing;
function runServer(port, benchmark_impl) { function runServer(port, benchmark_impl, callback) {
var server_creds = grpc.ServerCredentials.createInsecure(); var server_creds = grpc.ServerCredentials.createInsecure();
var server = new grpc.Server(); var server = new grpc.Server();
server.addService(serviceProto.WorkerService.service, server.addService(serviceProto.WorkerService.service,
new WorkerServiceImpl(benchmark_impl, server)); new WorkerServiceImpl(benchmark_impl, server));
var address = '0.0.0.0:' + port; var address = '0.0.0.0:' + port;
server.bind(address, server_creds); server.bindAsync(address, server_creds, (err) => {
server.start(); if (err) {
console.log('running QPS worker on %s', address); return callback(err);
return server; }
server.start();
console.log('running QPS worker on %s', address);
callback(null, server);
});
} }
if (require.main === module) { if (require.main === module) {
@ -50,7 +56,9 @@ if (require.main === module) {
var argv = parseArgs(process.argv, { var argv = parseArgs(process.argv, {
string: ['driver_port', 'benchmark_impl'] string: ['driver_port', 'benchmark_impl']
}); });
runServer(argv.driver_port, argv.benchmark_impl); runServer(argv.driver_port, argv.benchmark_impl, (err, server) => {
assert.ifError(err);
});
} }
exports.runServer = runServer; exports.runServer = runServer;