Add a libuv endpoint to the C core, for use in the Node library

This commit is contained in:
murgatroid99 2016-09-16 13:25:08 -07:00
parent b4828c3762
commit 4159622a60
14 changed files with 371 additions and 67 deletions

View File

@ -45,6 +45,7 @@
#include "byte_buffer.h"
#include "call.h"
#include "channel.h"
#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "call_credentials.h"
#include "timeval.h"
@ -222,6 +223,9 @@ class SendMetadataOp : public Op {
out->data.send_initial_metadata.metadata = array.metadata;
return true;
}
bool IsFinalOp() {
return false;
}
protected:
std::string GetTypeString() const {
return "send_metadata";
@ -263,6 +267,9 @@ class SendMessageOp : public Op {
resources->handles.push_back(unique_ptr<PersistentValue>(handle));
return true;
}
bool IsFinalOp() {
return false;
}
protected:
std::string GetTypeString() const {
return "send_message";
@ -281,6 +288,9 @@ class SendClientCloseOp : public Op {
shared_ptr<Resources> resources) {
return true;
}
bool IsFinalOp() {
return false;
}
protected:
std::string GetTypeString() const {
return "client_close";
@ -341,6 +351,9 @@ class SendServerStatusOp : public Op {
out->data.send_status_from_server.status_details = **str;
return true;
}
bool IsFinalOp() {
return true;
}
protected:
std::string GetTypeString() const {
return "send_status";
@ -367,6 +380,9 @@ class GetMetadataOp : public Op {
out->data.recv_initial_metadata = &recv_metadata;
return true;
}
bool IsFinalOp() {
return false;
}
protected:
std::string GetTypeString() const {
@ -397,6 +413,9 @@ class ReadMessageOp : public Op {
out->data.recv_message = &recv_message;
return true;
}
bool IsFinalOp() {
return false;
}
protected:
std::string GetTypeString() const {
@ -442,6 +461,9 @@ class ClientStatusOp : public Op {
ParseMetadata(&metadata_array));
return scope.Escape(status_obj);
}
bool IsFinalOp() {
return true;
}
protected:
std::string GetTypeString() const {
return "status";
@ -465,6 +487,9 @@ class ServerCloseResponseOp : public Op {
out->data.recv_close_on_server.cancelled = &cancelled;
return true;
}
bool IsFinalOp() {
return false;
}
protected:
std::string GetTypeString() const {
@ -476,8 +501,8 @@ class ServerCloseResponseOp : public Op {
};
tag::tag(Callback *callback, OpVec *ops,
shared_ptr<Resources> resources) :
callback(callback), ops(ops), resources(resources){
shared_ptr<Resources> resources, Call *call) :
callback(callback), ops(ops), resources(resources), call(call){
}
tag::~tag() {
@ -502,17 +527,37 @@ Callback *GetTagCallback(void *tag) {
return tag_struct->callback;
}
void CompleteTag(void *tag) {
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
bool is_final_op = false;
if (tag_struct->call == NULL) {
return;
}
for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
it != tag_struct->ops->end(); ++it) {
Op *op_ptr = it->get();
if (op_ptr->IsFinalOp()) {
is_final_op = true;
}
}
tag_struct->call->CompleteBatch(is_final_op);
}
void DestroyTag(void *tag) {
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
delete tag_struct;
}
Call::Call(grpc_call *call) : wrapped_call(call) {
Call::Call(grpc_call *call) : wrapped_call(call),
pending_batches(0),
has_final_op_completed(false) {
}
Call::~Call() {
if (wrapped_call != NULL) {
grpc_call_destroy(wrapped_call);
}
}
void Call::Init(Local<Object> exports) {
HandleScope scope;
@ -552,6 +597,17 @@ Local<Value> Call::WrapStruct(grpc_call *call) {
}
}
void Call::CompleteBatch(bool is_final_op) {
if (is_final_op) {
this->has_final_op_completed = true;
}
this->pending_batches--;
if (this->has_final_op_completed && this->pending_batches == 0) {
grpc_call_destroy(this->wrapped_call);
this->wrapped_call = NULL;
}
}
NAN_METHOD(Call::New) {
if (info.IsConstructCall()) {
Call *call;
@ -602,12 +658,12 @@ NAN_METHOD(Call::New) {
Utf8String host_override(info[3]);
wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags,
CompletionQueueAsyncWorker::GetQueue(), *method,
GetCompletionQueue(), *method,
*host_override, MillisecondsToTimespec(deadline), NULL);
} else if (info[3]->IsUndefined() || info[3]->IsNull()) {
wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags,
CompletionQueueAsyncWorker::GetQueue(), *method,
GetCompletionQueue(), *method,
NULL, MillisecondsToTimespec(deadline), NULL);
} else {
return Nan::ThrowTypeError("Call's fourth argument must be a string");
@ -697,11 +753,12 @@ NAN_METHOD(Call::StartBatch) {
Callback *callback = new Callback(callback_func);
grpc_call_error error = grpc_call_start_batch(
call->wrapped_call, &ops[0], nops, new struct tag(
callback, op_vector.release(), resources), NULL);
callback, op_vector.release(), resources, call), NULL);
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("startBatch failed", error));
}
CompletionQueueAsyncWorker::Next();
call->pending_batches++;
CompletionQueueNext();
}
NAN_METHOD(Call::Cancel) {

View File

@ -66,34 +66,6 @@ bool CreateMetadataArray(v8::Local<v8::Object> metadata,
grpc_metadata_array *array,
shared_ptr<Resources> resources);
class Op {
public:
virtual v8::Local<v8::Value> GetNodeValue() const = 0;
virtual bool ParseOp(v8::Local<v8::Value> value, grpc_op *out,
shared_ptr<Resources> resources) = 0;
virtual ~Op();
v8::Local<v8::Value> GetOpType() const;
protected:
virtual std::string GetTypeString() const = 0;
};
typedef std::vector<unique_ptr<Op>> OpVec;
struct tag {
tag(Nan::Callback *callback, OpVec *ops,
shared_ptr<Resources> resources);
~tag();
Nan::Callback *callback;
OpVec *ops;
shared_ptr<Resources> resources;
};
v8::Local<v8::Value> GetTagNodeValue(void *tag);
Nan::Callback *GetTagCallback(void *tag);
void DestroyTag(void *tag);
/* Wrapper class for grpc_call structs. */
class Call : public Nan::ObjectWrap {
public:
@ -102,6 +74,8 @@ class Call : public Nan::ObjectWrap {
/* Wrap a grpc_call struct in a javascript object */
static v8::Local<v8::Value> WrapStruct(grpc_call *call);
void CompleteBatch(bool is_final_op);
private:
explicit Call(grpc_call *call);
~Call();
@ -121,8 +95,46 @@ class Call : public Nan::ObjectWrap {
static Nan::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_call *wrapped_call;
// The number of ops that were started but not completed on this call
int pending_batches;
/* Indicates whether the "final" op on a call has completed. For a client
call, this is GRPC_OP_RECV_STATUS_ON_CLIENT and for a server call, this
is GRPC_OP_SEND_STATUS_FROM_SERVER */
bool has_final_op_completed;
};
class Op {
public:
virtual v8::Local<v8::Value> GetNodeValue() const = 0;
virtual bool ParseOp(v8::Local<v8::Value> value, grpc_op *out,
shared_ptr<Resources> resources) = 0;
virtual ~Op();
v8::Local<v8::Value> GetOpType() const;
virtual bool IsFinalOp() = 0;
protected:
virtual std::string GetTypeString() const = 0;
};
typedef std::vector<unique_ptr<Op>> OpVec;
struct tag {
tag(Nan::Callback *callback, OpVec *ops,
shared_ptr<Resources> resources, Call *call);
~tag();
Nan::Callback *callback;
OpVec *ops;
shared_ptr<Resources> resources;
Call *call;
};
v8::Local<v8::Value> GetTagNodeValue(void *tag);
Nan::Callback *GetTagCallback(void *tag);
void DestroyTag(void *tag);
void CompleteTag(void *tag);
} // namespace node
} // namespace grpc

View File

@ -41,6 +41,7 @@
#include "grpc/grpc_security.h"
#include "call.h"
#include "channel.h"
#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "channel_credentials.h"
#include "timeval.h"
@ -140,6 +141,7 @@ void DeallocateChannelArgs(grpc_channel_args *channel_args) {
Channel::Channel(grpc_channel *channel) : wrapped_channel(channel) {}
Channel::~Channel() {
gpr_log(GPR_DEBUG, "Destroying channel");
if (wrapped_channel != NULL) {
grpc_channel_destroy(wrapped_channel);
}
@ -276,11 +278,11 @@ NAN_METHOD(Channel::WatchConnectivityState) {
unique_ptr<OpVec> ops(new OpVec());
grpc_channel_watch_connectivity_state(
channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline),
CompletionQueueAsyncWorker::GetQueue(),
GetCompletionQueue(),
new struct tag(callback,
ops.release(),
shared_ptr<Resources>(nullptr)));
CompletionQueueAsyncWorker::Next();
shared_ptr<Resources>(nullptr), NULL));
CompletionQueueNext();
}
} // namespace node

114
ext/completion_queue.cc Normal file
View File

@ -0,0 +1,114 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <uv.h>
#include <node.h>
#include <v8.h>
#include <grpc/grpc.h>
#include "call.h"
#include "completion_queue.h"
#include "completion_queue_async_worker.h"
namespace grpc {
namespace node {
using v8::Local;
using v8::Object;
using v8::Value;
grpc_completion_queue *queue;
uv_prepare_t prepare;
int pending_batches;
void drain_completion_queue(uv_prepare_t *handle) {
Nan::HandleScope scope;
grpc_event event;
(void)handle;
do {
event = grpc_completion_queue_next(
queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
if (event.type == GRPC_OP_COMPLETE) {
Nan::Callback *callback = grpc::node::GetTagCallback(event.tag);
if (event.success) {
Local<Value> argv[] = {Nan::Null(),
grpc::node::GetTagNodeValue(event.tag)};
callback->Call(2, argv);
} else {
Local<Value> argv[] = {Nan::Error(
"The async function encountered an error")};
callback->Call(1, argv);
}
grpc::node::CompleteTag(event.tag);
grpc::node::DestroyTag(event.tag);
pending_batches--;
if (pending_batches == 0) {
uv_prepare_stop(&prepare);
}
}
} while (event.type != GRPC_QUEUE_TIMEOUT);
}
grpc_completion_queue *GetCompletionQueue() {
#ifdef GRPC_UV
return queue;
#else
return CompletionQueueAsyncWorker::GetQueue();
#endif
}
void CompletionQueueNext() {
#ifdef GRPC_UV
if (pending_batches == 0) {
GPR_ASSERT(!uv_is_active((uv_handle_t *)&prepare));
uv_prepare_start(&prepare, drain_completion_queue);
}
pending_batches++;
#else
CompletionQueueAsyncWorker::Next();
#endif
}
void CompletionQueueInit(Local<Object> exports) {
#ifdef GRPC_UV
queue = grpc_completion_queue_create(NULL);
uv_prepare_init(uv_default_loop(), &prepare);
pending_batches = 0;
#else
CompletionQueueAsyncWorker::Init(exports);
#endif
}
} // namespace node
} // namespace grpc

46
ext/completion_queue.h Normal file
View File

@ -0,0 +1,46 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <v8.h>
namespace grpc {
namespace node {
grpc_completion_queue *GetCompletionQueue();
void CompletionQueueNext();
void CompletionQueueInit(v8::Local<v8::Object> exports);
} // namespace node
} // namespace grpc

View File

@ -74,6 +74,7 @@ void CompletionQueueAsyncWorker::Execute() {
grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
void CompletionQueueAsyncWorker::Next() {
#ifndef GRPC_UV
Nan::HandleScope scope;
if (current_threads < max_queue_threads) {
current_threads += 1;
@ -85,6 +86,7 @@ void CompletionQueueAsyncWorker::Next() {
GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0));
#endif
}
void CompletionQueueAsyncWorker::Init(Local<Object> exports) {

View File

@ -50,6 +50,7 @@
#include "completion_queue_async_worker.h"
#include "server_credentials.h"
#include "timeval.h"
#include "completion_queue.h"
using v8::FunctionTemplate;
using v8::Local;
@ -261,8 +262,8 @@ void InitLogConstants(Local<Object> exports) {
Nan::HandleScope scope;
Local<Object> log_verbosity = Nan::New<Object>();
Nan::Set(exports, Nan::New("logVerbosity").ToLocalChecked(), log_verbosity);
Local<Value> DEBUG(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_DEBUG));
Nan::Set(log_verbosity, Nan::New("DEBUG").ToLocalChecked(), DEBUG);
Local<Value> DEBUG_LOG(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_DEBUG));
Nan::Set(log_verbosity, Nan::New("DEBUG").ToLocalChecked(), DEBUG_LOG);
Local<Value> INFO(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_INFO));
Nan::Set(log_verbosity, Nan::New("INFO").ToLocalChecked(), INFO);
Local<Value> LOG_ERROR(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_ERROR));
@ -414,6 +415,12 @@ NAN_METHOD(SetLogVerbosity) {
gpr_set_log_verbosity(severity);
}
uv_signal_t signal_handle;
void signal_callback(uv_signal_t *handle, int signum) {
uv_print_all_handles(uv_default_loop(), stderr);
}
void init(Local<Object> exports) {
Nan::HandleScope scope;
grpc_init();
@ -428,14 +435,20 @@ void init(Local<Object> exports) {
InitWriteFlags(exports);
InitLogConstants(exports);
uv_signal_init(uv_default_loop(), &signal_handle);
uv_signal_start(&signal_handle, signal_callback, SIGUSR2);
uv_unref((uv_handle_t *)&signal_handle);
grpc::node::Call::Init(exports);
grpc::node::CallCredentials::Init(exports);
grpc::node::Channel::Init(exports);
grpc::node::ChannelCredentials::Init(exports);
grpc::node::Server::Init(exports);
grpc::node::CompletionQueueAsyncWorker::Init(exports);
grpc::node::ServerCredentials::Init(exports);
grpc::node::CompletionQueueInit(exports);
// Attach a few utility functions directly to the module
Nan::Set(exports, Nan::New("metadataKeyIsLegal").ToLocalChecked(),
Nan::GetFunction(

View File

@ -40,6 +40,7 @@
#include <vector>
#include "call.h"
#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
@ -64,6 +65,7 @@ using v8::Array;
using v8::Boolean;
using v8::Date;
using v8::Exception;
using v8::External;
using v8::Function;
using v8::FunctionTemplate;
using v8::Local;
@ -75,6 +77,8 @@ using v8::Value;
Nan::Callback *Server::constructor;
Persistent<FunctionTemplate> Server::fun_tpl;
static Callback *shutdown_callback;
class NewCallOp : public Op {
public:
NewCallOp() {
@ -111,6 +115,9 @@ class NewCallOp : public Op {
shared_ptr<Resources> resources) {
return true;
}
bool IsFinalOp() {
return false;
}
grpc_call *call;
grpc_call_details details;
@ -120,17 +127,50 @@ class NewCallOp : public Op {
std::string GetTypeString() const { return "new_call"; }
};
class ServerShutdownOp : public Op {
public:
ServerShutdownOp(grpc_server *server): server(server) {
}
~ServerShutdownOp() {
}
Local<Value> GetNodeValue() const {
return Nan::New<External>(reinterpret_cast<void *>(server));
}
bool ParseOp(Local<Value> value, grpc_op *out,
shared_ptr<Resources> resources) {
return true;
}
bool IsFinalOp() {
return false;
}
grpc_server *server;
protected:
std::string GetTypeString() const { return "shutdown"; }
};
NAN_METHOD(ServerShutdownCallback) {
if (!info[0]->IsNull()) {
return Nan::ThrowError("forceShutdown failed somehow");
}
MaybeLocal<Object> maybe_result = Nan::To<Object>(info[1]);
Local<Object> result = maybe_result.ToLocalChecked();
Local<Value> server_val = Nan::Get(
result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked();
Local<External> server_extern = server_val.As<External>();
grpc_server *server = reinterpret_cast<grpc_server *>(server_extern->Value());
grpc_server_destroy(server);
}
Server::Server(grpc_server *server) : wrapped_server(server) {
shutdown_queue = grpc_completion_queue_create(NULL);
grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
NULL);
}
Server::~Server() {
this->ShutdownServer();
grpc_completion_queue_shutdown(this->shutdown_queue);
grpc_server_destroy(this->wrapped_server);
grpc_completion_queue_destroy(this->shutdown_queue);
}
void Server::Init(Local<Object> exports) {
@ -147,6 +187,11 @@ void Server::Init(Local<Object> exports) {
Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked();
Nan::Set(exports, Nan::New("Server").ToLocalChecked(), ctr);
constructor = new Callback(ctr);
Local<FunctionTemplate>callback_tpl =
Nan::New<FunctionTemplate>(ServerShutdownCallback);
shutdown_callback = new Callback(
Nan::GetFunction(callback_tpl).ToLocalChecked());
}
bool Server::HasInstance(Local<Value> val) {
@ -155,11 +200,19 @@ bool Server::HasInstance(Local<Value> val) {
}
void Server::ShutdownServer() {
grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue,
NULL);
if (this->wrapped_server != NULL) {
ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server);
unique_ptr<OpVec> ops(new OpVec());
ops->push_back(unique_ptr<Op>(op));
grpc_server_shutdown_and_notify(
this->wrapped_server, GetCompletionQueue(),
new struct tag(new Callback(**shutdown_callback), ops.release(),
shared_ptr<Resources>(nullptr), NULL));
grpc_server_cancel_all_calls(this->wrapped_server);
grpc_completion_queue_pluck(this->shutdown_queue, NULL,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
CompletionQueueNext();
this->wrapped_server = NULL;
}
}
NAN_METHOD(Server::New) {
@ -179,7 +232,7 @@ NAN_METHOD(Server::New) {
}
}
grpc_server *wrapped_server;
grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue();
grpc_completion_queue *queue = GetCompletionQueue();
grpc_channel_args *channel_args;
if (!ParseChannelArgs(info[0], &channel_args)) {
DeallocateChannelArgs(channel_args);
@ -205,14 +258,14 @@ NAN_METHOD(Server::RequestCall) {
ops->push_back(unique_ptr<Op>(op));
grpc_call_error error = grpc_server_request_call(
server->wrapped_server, &op->call, &op->details, &op->request_metadata,
CompletionQueueAsyncWorker::GetQueue(),
CompletionQueueAsyncWorker::GetQueue(),
GetCompletionQueue(),
GetCompletionQueue(),
new struct tag(new Callback(info[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr)));
shared_ptr<Resources>(nullptr), NULL));
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("requestCall failed", error));
}
CompletionQueueAsyncWorker::Next();
CompletionQueueNext();
}
NAN_METHOD(Server::AddHttp2Port) {
@ -259,10 +312,10 @@ NAN_METHOD(Server::TryShutdown) {
Server *server = ObjectWrap::Unwrap<Server>(info.This());
unique_ptr<OpVec> ops(new OpVec());
grpc_server_shutdown_and_notify(
server->wrapped_server, CompletionQueueAsyncWorker::GetQueue(),
server->wrapped_server, GetCompletionQueue(),
new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr)));
CompletionQueueAsyncWorker::Next();
shared_ptr<Resources>(nullptr), NULL));
CompletionQueueNext();
}
NAN_METHOD(Server::ForceShutdown) {

View File

@ -73,7 +73,6 @@ class Server : public Nan::ObjectWrap {
static Nan::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_server *wrapped_server;
grpc_completion_queue *shutdown_queue;
};
} // namespace node

View File

@ -169,18 +169,18 @@ NAN_METHOD(ServerCredentials::CreateSsl) {
for(uint32_t i = 0; i < key_cert_pair_count; i++) {
Local<Value> pair_val = Nan::Get(pair_list, i).ToLocalChecked();
if (!pair_val->IsObject()) {
delete key_cert_pairs;
delete[] key_cert_pairs;
return Nan::ThrowTypeError("Key/cert pairs must be objects");
}
Local<Object> pair_obj = Nan::To<Object>(pair_val).ToLocalChecked();
Local<Value> maybe_key = Nan::Get(pair_obj, key_key).ToLocalChecked();
Local<Value> maybe_cert = Nan::Get(pair_obj, cert_key).ToLocalChecked();
if (!::node::Buffer::HasInstance(maybe_key)) {
delete key_cert_pairs;
delete[] key_cert_pairs;
return Nan::ThrowTypeError("private_key must be a Buffer");
}
if (!::node::Buffer::HasInstance(maybe_cert)) {
delete key_cert_pairs;
delete[] key_cert_pairs;
return Nan::ThrowTypeError("cert_chain must be a Buffer");
}
key_cert_pairs[i].private_key = ::node::Buffer::Data(maybe_key);
@ -189,7 +189,7 @@ NAN_METHOD(ServerCredentials::CreateSsl) {
grpc_server_credentials *creds = grpc_ssl_server_credentials_create_ex(
root_certs, key_cert_pairs, key_cert_pair_count,
client_certificate_request, NULL);
delete key_cert_pairs;
delete[] key_cert_pairs;
if (creds == NULL) {
info.GetReturnValue().SetNull();
} else {

View File

@ -219,3 +219,7 @@ exports.getClientChannel = client.getClientChannel;
* @see module:src/client.waitForClientReady
*/
exports.waitForClientReady = client.waitForClientReady;
exports.closeClient = function closeClient(client_obj) {
client.getClientChannel(client_obj).close();
};

View File

@ -382,6 +382,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
if (args.options) {
message.grpcWriteFlags = args.options.flags;
}
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message;

View File

@ -31,7 +31,7 @@
*
*/
var binary = require('node-pre-gyp');
var binary = require('node-pre-gyp/lib/pre-binding');
var path = require('path');
var binding_path =
binary.find(path.resolve(path.join(__dirname, '../../../package.json')));

View File

@ -61,6 +61,7 @@ describe('Async functionality', function() {
done();
});
after(function() {
grpc.closeClient(math_client);
server.forceShutdown();
});
it('should not hang', function(done) {