mirror of https://github.com/grpc/grpc-node.git
Merge pull request #2993 from murgatroid99/node_server_graceful_shutdown
Prevent the Node server from locking up when shutting down
This commit is contained in:
commit
1ca2e73af3
|
@ -120,7 +120,7 @@ Server::Server(grpc_server *server) : wrapped_server(server) {
|
|||
Server::~Server() {
|
||||
this->ShutdownServer();
|
||||
grpc_completion_queue_shutdown(this->shutdown_queue);
|
||||
grpc_server_destroy(wrapped_server);
|
||||
grpc_server_destroy(this->wrapped_server);
|
||||
grpc_completion_queue_destroy(this->shutdown_queue);
|
||||
}
|
||||
|
||||
|
@ -139,8 +139,11 @@ void Server::Init(Handle<Object> exports) {
|
|||
NanSetPrototypeTemplate(tpl, "start",
|
||||
NanNew<FunctionTemplate>(Start)->GetFunction());
|
||||
|
||||
NanSetPrototypeTemplate(tpl, "shutdown",
|
||||
NanNew<FunctionTemplate>(Shutdown)->GetFunction());
|
||||
NanSetPrototypeTemplate(tpl, "tryShutdown",
|
||||
NanNew<FunctionTemplate>(TryShutdown)->GetFunction());
|
||||
NanSetPrototypeTemplate(
|
||||
tpl, "forceShutdown",
|
||||
NanNew<FunctionTemplate>(ForceShutdown)->GetFunction());
|
||||
|
||||
NanAssignPersistent(fun_tpl, tpl);
|
||||
Handle<Function> ctr = tpl->GetFunction();
|
||||
|
@ -153,14 +156,12 @@ bool Server::HasInstance(Handle<Value> val) {
|
|||
}
|
||||
|
||||
void Server::ShutdownServer() {
|
||||
if (this->wrapped_server != NULL) {
|
||||
grpc_server_shutdown_and_notify(this->wrapped_server,
|
||||
this->shutdown_queue,
|
||||
NULL);
|
||||
grpc_completion_queue_pluck(this->shutdown_queue, NULL,
|
||||
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
|
||||
this->wrapped_server = NULL;
|
||||
}
|
||||
grpc_server_shutdown_and_notify(this->wrapped_server,
|
||||
this->shutdown_queue,
|
||||
NULL);
|
||||
grpc_server_cancel_all_calls(this->wrapped_server);
|
||||
grpc_completion_queue_pluck(this->shutdown_queue, NULL,
|
||||
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
|
||||
}
|
||||
|
||||
NAN_METHOD(Server::New) {
|
||||
|
@ -222,9 +223,6 @@ NAN_METHOD(Server::RequestCall) {
|
|||
return NanThrowTypeError("requestCall can only be called on a Server");
|
||||
}
|
||||
Server *server = ObjectWrap::Unwrap<Server>(args.This());
|
||||
if (server->wrapped_server == NULL) {
|
||||
return NanThrowError("requestCall cannot be called on a shut down Server");
|
||||
}
|
||||
NewCallOp *op = new NewCallOp();
|
||||
unique_ptr<OpVec> ops(new OpVec());
|
||||
ops->push_back(unique_ptr<Op>(op));
|
||||
|
@ -256,10 +254,6 @@ NAN_METHOD(Server::AddHttp2Port) {
|
|||
"addHttp2Port's second argument must be ServerCredentials");
|
||||
}
|
||||
Server *server = ObjectWrap::Unwrap<Server>(args.This());
|
||||
if (server->wrapped_server == NULL) {
|
||||
return NanThrowError(
|
||||
"addHttp2Port cannot be called on a shut down Server");
|
||||
}
|
||||
ServerCredentials *creds_object = ObjectWrap::Unwrap<ServerCredentials>(
|
||||
args[1]->ToObject());
|
||||
grpc_server_credentials *creds = creds_object->GetWrappedServerCredentials();
|
||||
|
@ -281,21 +275,30 @@ NAN_METHOD(Server::Start) {
|
|||
return NanThrowTypeError("start can only be called on a Server");
|
||||
}
|
||||
Server *server = ObjectWrap::Unwrap<Server>(args.This());
|
||||
if (server->wrapped_server == NULL) {
|
||||
return NanThrowError("start cannot be called on a shut down Server");
|
||||
}
|
||||
grpc_server_start(server->wrapped_server);
|
||||
NanReturnUndefined();
|
||||
}
|
||||
|
||||
NAN_METHOD(ShutdownCallback) {
|
||||
NAN_METHOD(Server::TryShutdown) {
|
||||
NanScope();
|
||||
if (!HasInstance(args.This())) {
|
||||
return NanThrowTypeError("tryShutdown can only be called on a Server");
|
||||
}
|
||||
Server *server = ObjectWrap::Unwrap<Server>(args.This());
|
||||
unique_ptr<OpVec> ops(new OpVec());
|
||||
grpc_server_shutdown_and_notify(
|
||||
server->wrapped_server,
|
||||
CompletionQueueAsyncWorker::GetQueue(),
|
||||
new struct tag(new NanCallback(args[0].As<Function>()), ops.release(),
|
||||
shared_ptr<Resources>(nullptr)));
|
||||
CompletionQueueAsyncWorker::Next();
|
||||
NanReturnUndefined();
|
||||
}
|
||||
|
||||
NAN_METHOD(Server::Shutdown) {
|
||||
NAN_METHOD(Server::ForceShutdown) {
|
||||
NanScope();
|
||||
if (!HasInstance(args.This())) {
|
||||
return NanThrowTypeError("shutdown can only be called on a Server");
|
||||
return NanThrowTypeError("forceShutdown can only be called on a Server");
|
||||
}
|
||||
Server *server = ObjectWrap::Unwrap<Server>(args.This());
|
||||
server->ShutdownServer();
|
||||
|
|
|
@ -67,7 +67,8 @@ class Server : public ::node::ObjectWrap {
|
|||
static NAN_METHOD(RequestCall);
|
||||
static NAN_METHOD(AddHttp2Port);
|
||||
static NAN_METHOD(Start);
|
||||
static NAN_METHOD(Shutdown);
|
||||
static NAN_METHOD(TryShutdown);
|
||||
static NAN_METHOD(ForceShutdown);
|
||||
static NanCallback *constructor;
|
||||
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
|
||||
|
||||
|
|
|
@ -623,11 +623,26 @@ function Server(options) {
|
|||
}
|
||||
server.requestCall(handleNewCall);
|
||||
};
|
||||
|
||||
/**
|
||||
* Shuts down the server.
|
||||
* Gracefully shuts down the server. The server will stop receiving new calls,
|
||||
* and any pending calls will complete. The callback will be called when all
|
||||
* pending calls have completed and the server is fully shut down. This method
|
||||
* is idempotent with itself and forceShutdown.
|
||||
* @param {function()} callback The shutdown complete callback
|
||||
*/
|
||||
this.shutdown = function() {
|
||||
server.shutdown();
|
||||
this.tryShutdown = function(callback) {
|
||||
server.tryShutdown(callback);
|
||||
};
|
||||
|
||||
/**
|
||||
* Forcibly shuts down the server. The server will stop receiving new calls
|
||||
* and cancel all pending calls. When it returns, the server has shut down.
|
||||
* This method is idempotent with itself and tryShutdown, and it will trigger
|
||||
* any outstanding tryShutdown callbacks.
|
||||
*/
|
||||
this.forceShutdown = function() {
|
||||
server.forceShutdown();
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ describe('call', function() {
|
|||
channel = new grpc.Channel('localhost:' + port, insecureCreds);
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
describe('constructor', function() {
|
||||
it('should reject anything less than 3 arguments', function() {
|
||||
|
|
|
@ -70,7 +70,7 @@ describe('end-to-end', function() {
|
|||
channel = new grpc.Channel('localhost:' + port_num, insecureCreds);
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('should start and end a request without error', function(complete) {
|
||||
var done = multiDone(complete, 2);
|
||||
|
|
|
@ -57,7 +57,7 @@ describe('Health Checking', function() {
|
|||
grpc.Credentials.createInsecure());
|
||||
});
|
||||
after(function() {
|
||||
healthServer.shutdown();
|
||||
healthServer.forceShutdown();
|
||||
});
|
||||
it('should say an enabled service is SERVING', function(done) {
|
||||
healthClient.check({service: ''}, function(err, response) {
|
||||
|
|
|
@ -51,7 +51,7 @@ describe('Interop tests', function() {
|
|||
done();
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
// This depends on not using a binary stream
|
||||
it('should pass empty_unary', function(done) {
|
||||
|
|
|
@ -59,7 +59,7 @@ describe('Math client', function() {
|
|||
done();
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('should handle a single request', function(done) {
|
||||
var arg = {dividend: 7, divisor: 4};
|
||||
|
|
|
@ -92,7 +92,7 @@ describe('server', function() {
|
|||
server.addHttp2Port('0.0.0.0:0', grpc.ServerCredentials.createInsecure());
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('should start without error', function() {
|
||||
assert.doesNotThrow(function() {
|
||||
|
@ -100,4 +100,33 @@ describe('server', function() {
|
|||
});
|
||||
});
|
||||
});
|
||||
describe('shutdown', function() {
|
||||
var server;
|
||||
beforeEach(function() {
|
||||
server = new grpc.Server();
|
||||
server.addHttp2Port('0.0.0.0:0', grpc.ServerCredentials.createInsecure());
|
||||
server.start();
|
||||
});
|
||||
afterEach(function() {
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('tryShutdown should shutdown successfully', function(done) {
|
||||
server.tryShutdown(done);
|
||||
});
|
||||
it('forceShutdown should shutdown successfully', function() {
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('tryShutdown should be idempotent', function(done) {
|
||||
server.tryShutdown(done);
|
||||
server.tryShutdown(function() {});
|
||||
});
|
||||
it('forceShutdown should be idempotent', function() {
|
||||
server.forceShutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('forceShutdown should trigger tryShutdown', function(done) {
|
||||
server.tryShutdown(done);
|
||||
server.forceShutdown();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -104,7 +104,7 @@ describe('Server.prototype.addProtoService', function() {
|
|||
server = new grpc.Server();
|
||||
});
|
||||
afterEach(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('Should succeed with a single service', function() {
|
||||
assert.doesNotThrow(function() {
|
||||
|
@ -148,7 +148,7 @@ describe('Client#$waitForReady', function() {
|
|||
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('should complete when called alone', function(done) {
|
||||
client.$waitForReady(Infinity, function(error) {
|
||||
|
@ -203,7 +203,7 @@ describe('Echo service', function() {
|
|||
server.start();
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('should echo the recieved message directly', function(done) {
|
||||
client.echo({value: 'test value', value2: 3}, function(error, response) {
|
||||
|
@ -248,7 +248,7 @@ describe('Generic client and server', function() {
|
|||
grpc.Credentials.createInsecure());
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('Should respond with a capitalized string', function(done) {
|
||||
client.capitalize('abc', function(err, response) {
|
||||
|
@ -296,7 +296,7 @@ describe('Echo metadata', function() {
|
|||
server.start();
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('with unary call', function(done) {
|
||||
var call = client.unary({}, function(err, data) {
|
||||
|
@ -419,7 +419,7 @@ describe('Other conditions', function() {
|
|||
server.start();
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('channel.getTarget should be available', function() {
|
||||
assert.strictEqual(typeof client.channel.getTarget(), 'string');
|
||||
|
@ -681,7 +681,7 @@ describe('Other conditions', function() {
|
|||
});
|
||||
afterEach(function() {
|
||||
console.log('Shutting down server');
|
||||
proxy.shutdown();
|
||||
proxy.forceShutdown();
|
||||
});
|
||||
describe('Cancellation', function() {
|
||||
it('With a unary call', function(done) {
|
||||
|
@ -847,7 +847,7 @@ describe('Cancelling surface client', function() {
|
|||
server.start();
|
||||
});
|
||||
after(function() {
|
||||
server.shutdown();
|
||||
server.forceShutdown();
|
||||
});
|
||||
it('Should correctly cancel a unary call', function(done) {
|
||||
var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
|
||||
|
|
Loading…
Reference in New Issue