Merge branch 'master' into new_tsi

This commit is contained in:
jiangtaoli2016 2017-04-15 10:20:51 -07:00
commit c52c60f7b7
14 changed files with 204 additions and 105 deletions

View File

@ -99,7 +99,6 @@ Local<Value> nanErrorWithCode(const char *msg, grpc_call_error code) {
bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) { bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) {
HandleScope scope; HandleScope scope;
grpc_metadata_array_init(array);
Local<Array> keys = Nan::GetOwnPropertyNames(metadata).ToLocalChecked(); Local<Array> keys = Nan::GetOwnPropertyNames(metadata).ToLocalChecked();
for (unsigned int i = 0; i < keys->Length(); i++) { for (unsigned int i = 0; i < keys->Length(); i++) {
Local<String> current_key = Nan::To<String>( Local<String> current_key = Nan::To<String>(
@ -111,18 +110,20 @@ bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) {
array->capacity += Local<Array>::Cast(value_array)->Length(); array->capacity += Local<Array>::Cast(value_array)->Length();
} }
array->metadata = reinterpret_cast<grpc_metadata*>( array->metadata = reinterpret_cast<grpc_metadata*>(
gpr_malloc(array->capacity * sizeof(grpc_metadata))); gpr_zalloc(array->capacity * sizeof(grpc_metadata)));
for (unsigned int i = 0; i < keys->Length(); i++) { for (unsigned int i = 0; i < keys->Length(); i++) {
Local<String> current_key(Nan::To<String>(keys->Get(i)).ToLocalChecked()); Local<String> current_key(Nan::To<String>(keys->Get(i)).ToLocalChecked());
Local<Array> values = Local<Array>::Cast( Local<Array> values = Local<Array>::Cast(
Nan::Get(metadata, current_key).ToLocalChecked()); Nan::Get(metadata, current_key).ToLocalChecked());
grpc_slice key_slice = grpc_slice_intern(CreateSliceFromString(current_key)); grpc_slice key_slice = CreateSliceFromString(current_key);
grpc_slice key_intern_slice = grpc_slice_intern(key_slice);
grpc_slice_unref(key_slice);
for (unsigned int j = 0; j < values->Length(); j++) { for (unsigned int j = 0; j < values->Length(); j++) {
Local<Value> value = Nan::Get(values, j).ToLocalChecked(); Local<Value> value = Nan::Get(values, j).ToLocalChecked();
grpc_metadata *current = &array->metadata[array->count]; grpc_metadata *current = &array->metadata[array->count];
current->key = key_slice; current->key = key_intern_slice;
// Only allow binary headers for "-bin" keys // Only allow binary headers for "-bin" keys
if (grpc_is_binary_header(key_slice)) { if (grpc_is_binary_header(key_intern_slice)) {
if (::node::Buffer::HasInstance(value)) { if (::node::Buffer::HasInstance(value)) {
current->value = CreateSliceFromBuffer(value); current->value = CreateSliceFromBuffer(value);
} else { } else {
@ -142,6 +143,14 @@ bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) {
return true; return true;
} }
void DestroyMetadataArray(grpc_metadata_array *array) {
for (size_t i = 0; i < array->count; i++) {
// Don't unref keys because they are interned
grpc_slice_unref(array->metadata[i].value);
}
grpc_metadata_array_destroy(array);
}
Local<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { Local<Value> ParseMetadata(const grpc_metadata_array *metadata_array) {
EscapableHandleScope scope; EscapableHandleScope scope;
grpc_metadata *metadata_elements = metadata_array->metadata; grpc_metadata *metadata_elements = metadata_array->metadata;
@ -179,6 +188,12 @@ Op::~Op() {
class SendMetadataOp : public Op { class SendMetadataOp : public Op {
public: public:
SendMetadataOp() {
grpc_metadata_array_init(&send_metadata);
}
~SendMetadataOp() {
DestroyMetadataArray(&send_metadata);
}
Local<Value> GetNodeValue() const { Local<Value> GetNodeValue() const {
EscapableHandleScope scope; EscapableHandleScope scope;
return scope.Escape(Nan::True()); return scope.Escape(Nan::True());
@ -187,26 +202,29 @@ class SendMetadataOp : public Op {
if (!value->IsObject()) { if (!value->IsObject()) {
return false; return false;
} }
grpc_metadata_array array;
MaybeLocal<Object> maybe_metadata = Nan::To<Object>(value); MaybeLocal<Object> maybe_metadata = Nan::To<Object>(value);
if (maybe_metadata.IsEmpty()) { if (maybe_metadata.IsEmpty()) {
return false; return false;
} }
if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(), if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(),
&array)) { &send_metadata)) {
return false; return false;
} }
out->data.send_initial_metadata.count = array.count; out->data.send_initial_metadata.count = send_metadata.count;
out->data.send_initial_metadata.metadata = array.metadata; out->data.send_initial_metadata.metadata = send_metadata.metadata;
return true; return true;
} }
bool IsFinalOp() { bool IsFinalOp() {
return false; return false;
} }
void OnComplete(bool success) {
}
protected: protected:
std::string GetTypeString() const { std::string GetTypeString() const {
return "send_metadata"; return "send_metadata";
} }
private:
grpc_metadata_array send_metadata;
}; };
class SendMessageOp : public Op { class SendMessageOp : public Op {
@ -244,6 +262,8 @@ class SendMessageOp : public Op {
bool IsFinalOp() { bool IsFinalOp() {
return false; return false;
} }
void OnComplete(bool success) {
}
protected: protected:
std::string GetTypeString() const { std::string GetTypeString() const {
return "send_message"; return "send_message";
@ -264,6 +284,8 @@ class SendClientCloseOp : public Op {
bool IsFinalOp() { bool IsFinalOp() {
return false; return false;
} }
void OnComplete(bool success) {
}
protected: protected:
std::string GetTypeString() const { std::string GetTypeString() const {
return "client_close"; return "client_close";
@ -272,8 +294,12 @@ class SendClientCloseOp : public Op {
class SendServerStatusOp : public Op { class SendServerStatusOp : public Op {
public: public:
SendServerStatusOp() {
grpc_metadata_array_init(&status_metadata);
}
~SendServerStatusOp() { ~SendServerStatusOp() {
grpc_slice_unref(details); grpc_slice_unref(details);
DestroyMetadataArray(&status_metadata);
} }
Local<Value> GetNodeValue() const { Local<Value> GetNodeValue() const {
EscapableHandleScope scope; EscapableHandleScope scope;
@ -313,12 +339,13 @@ class SendServerStatusOp : public Op {
} }
Local<String> details = Nan::To<String>( Local<String> details = Nan::To<String>(
maybe_details.ToLocalChecked()).ToLocalChecked(); maybe_details.ToLocalChecked()).ToLocalChecked();
grpc_metadata_array array; if (!CreateMetadataArray(metadata, &status_metadata)) {
if (!CreateMetadataArray(metadata, &array)) {
return false; return false;
} }
out->data.send_status_from_server.trailing_metadata_count = array.count; out->data.send_status_from_server.trailing_metadata_count =
out->data.send_status_from_server.trailing_metadata = array.metadata; status_metadata.count;
out->data.send_status_from_server.trailing_metadata =
status_metadata.metadata;
out->data.send_status_from_server.status = out->data.send_status_from_server.status =
static_cast<grpc_status_code>(code); static_cast<grpc_status_code>(code);
this->details = CreateSliceFromString(details); this->details = CreateSliceFromString(details);
@ -328,6 +355,8 @@ class SendServerStatusOp : public Op {
bool IsFinalOp() { bool IsFinalOp() {
return true; return true;
} }
void OnComplete(bool success) {
}
protected: protected:
std::string GetTypeString() const { std::string GetTypeString() const {
return "send_status"; return "send_status";
@ -335,6 +364,7 @@ class SendServerStatusOp : public Op {
private: private:
grpc_slice details; grpc_slice details;
grpc_metadata_array status_metadata;
}; };
class GetMetadataOp : public Op { class GetMetadataOp : public Op {
@ -359,6 +389,8 @@ class GetMetadataOp : public Op {
bool IsFinalOp() { bool IsFinalOp() {
return false; return false;
} }
void OnComplete(bool success) {
}
protected: protected:
std::string GetTypeString() const { std::string GetTypeString() const {
@ -391,6 +423,8 @@ class ReadMessageOp : public Op {
bool IsFinalOp() { bool IsFinalOp() {
return false; return false;
} }
void OnComplete(bool success) {
}
protected: protected:
std::string GetTypeString() const { std::string GetTypeString() const {
@ -432,6 +466,8 @@ class ClientStatusOp : public Op {
bool IsFinalOp() { bool IsFinalOp() {
return true; return true;
} }
void OnComplete(bool success) {
}
protected: protected:
std::string GetTypeString() const { std::string GetTypeString() const {
return "status"; return "status";
@ -456,6 +492,8 @@ class ServerCloseResponseOp : public Op {
bool IsFinalOp() { bool IsFinalOp() {
return false; return false;
} }
void OnComplete(bool success) {
}
protected: protected:
std::string GetTypeString() const { std::string GetTypeString() const {
@ -466,8 +504,10 @@ class ServerCloseResponseOp : public Op {
int cancelled; int cancelled;
}; };
tag::tag(Callback *callback, OpVec *ops, Call *call) : tag::tag(Callback *callback, OpVec *ops, Call *call, Local<Value> call_value) :
callback(callback), ops(ops), call(call){ callback(callback), ops(ops), call(call){
HandleScope scope;
call_persist.Reset(call_value);
} }
tag::~tag() { tag::~tag() {
@ -475,36 +515,36 @@ tag::~tag() {
delete ops; delete ops;
} }
Local<Value> GetTagNodeValue(void *tag) { void CompleteTag(void *tag, const char *error_message) {
EscapableHandleScope scope; HandleScope scope;
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
Callback *callback = tag_struct->callback;
if (error_message == NULL) {
Local<Object> tag_obj = Nan::New<Object>(); Local<Object> tag_obj = Nan::New<Object>();
for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin(); for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
it != tag_struct->ops->end(); ++it) { it != tag_struct->ops->end(); ++it) {
Op *op_ptr = it->get(); Op *op_ptr = it->get();
Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue()); Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue());
} }
return scope.Escape(tag_obj); Local<Value> argv[] = {Nan::Null(), tag_obj};
} callback->Call(2, argv);
} else {
Callback *GetTagCallback(void *tag) { Local<Value> argv[] = {Nan::Error(error_message)};
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); callback->Call(1, argv);
return tag_struct->callback;
}
void CompleteTag(void *tag) {
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
bool is_final_op = false;
if (tag_struct->call == NULL) {
return;
} }
bool success = (error_message == NULL);
bool is_final_op = false;
for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin(); for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
it != tag_struct->ops->end(); ++it) { it != tag_struct->ops->end(); ++it) {
Op *op_ptr = it->get(); Op *op_ptr = it->get();
op_ptr->OnComplete(success);
if (op_ptr->IsFinalOp()) { if (op_ptr->IsFinalOp()) {
is_final_op = true; is_final_op = true;
} }
} }
if (tag_struct->call == NULL) {
return;
}
tag_struct->call->CompleteBatch(is_final_op); tag_struct->call->CompleteBatch(is_final_op);
} }
@ -513,15 +553,20 @@ void DestroyTag(void *tag) {
delete tag_struct; delete tag_struct;
} }
void Call::DestroyCall() {
if (this->wrapped_call != NULL) {
grpc_call_destroy(this->wrapped_call);
this->wrapped_call = NULL;
}
}
Call::Call(grpc_call *call) : wrapped_call(call), Call::Call(grpc_call *call) : wrapped_call(call),
pending_batches(0), pending_batches(0),
has_final_op_completed(false) { has_final_op_completed(false) {
} }
Call::~Call() { Call::~Call() {
if (wrapped_call != NULL) { DestroyCall();
grpc_call_destroy(wrapped_call);
}
} }
void Call::Init(Local<Object> exports) { void Call::Init(Local<Object> exports) {
@ -568,12 +613,19 @@ void Call::CompleteBatch(bool is_final_op) {
} }
this->pending_batches--; this->pending_batches--;
if (this->has_final_op_completed && this->pending_batches == 0) { if (this->has_final_op_completed && this->pending_batches == 0) {
grpc_call_destroy(this->wrapped_call); this->DestroyCall();
this->wrapped_call = NULL;
} }
} }
NAN_METHOD(Call::New) { NAN_METHOD(Call::New) {
/* Arguments:
* 0: Channel to make the call on
* 1: Method
* 2: Deadline
* 3: host
* 4: parent Call
* 5: propagation flags
*/
if (info.IsConstructCall()) { if (info.IsConstructCall()) {
Call *call; Call *call;
if (info[0]->IsExternal()) { if (info[0]->IsExternal()) {
@ -618,25 +670,26 @@ NAN_METHOD(Call::New) {
double deadline = Nan::To<double>(info[2]).FromJust(); double deadline = Nan::To<double>(info[2]).FromJust();
grpc_channel *wrapped_channel = channel->GetWrappedChannel(); grpc_channel *wrapped_channel = channel->GetWrappedChannel();
grpc_call *wrapped_call; grpc_call *wrapped_call;
grpc_slice method = CreateSliceFromString(
Nan::To<String>(info[1]).ToLocalChecked());
if (info[3]->IsString()) { if (info[3]->IsString()) {
grpc_slice *host = new grpc_slice; grpc_slice *host = new grpc_slice;
*host = CreateSliceFromString( *host = CreateSliceFromString(
Nan::To<String>(info[3]).ToLocalChecked()); Nan::To<String>(info[3]).ToLocalChecked());
wrapped_call = grpc_channel_create_call( wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags, wrapped_channel, parent_call, propagate_flags,
GetCompletionQueue(), CreateSliceFromString( GetCompletionQueue(), method,
Nan::To<String>(info[1]).ToLocalChecked()),
host, MillisecondsToTimespec(deadline), NULL); host, MillisecondsToTimespec(deadline), NULL);
delete host; delete host;
} else if (info[3]->IsUndefined() || info[3]->IsNull()) { } else if (info[3]->IsUndefined() || info[3]->IsNull()) {
wrapped_call = grpc_channel_create_call( wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags, wrapped_channel, parent_call, propagate_flags,
GetCompletionQueue(), CreateSliceFromString( GetCompletionQueue(), method,
Nan::To<String>(info[1]).ToLocalChecked()),
NULL, MillisecondsToTimespec(deadline), NULL); NULL, MillisecondsToTimespec(deadline), NULL);
} else { } else {
return Nan::ThrowTypeError("Call's fourth argument must be a string"); return Nan::ThrowTypeError("Call's fourth argument must be a string");
} }
grpc_slice_unref(method);
call = new Call(wrapped_call); call = new Call(wrapped_call);
Nan::Set(info.This(), Nan::New("channel_").ToLocalChecked(), Nan::Set(info.This(), Nan::New("channel_").ToLocalChecked(),
channel_object); channel_object);
@ -721,7 +774,7 @@ NAN_METHOD(Call::StartBatch) {
Callback *callback = new Callback(callback_func); Callback *callback = new Callback(callback_func);
grpc_call_error error = grpc_call_start_batch( grpc_call_error error = grpc_call_start_batch(
call->wrapped_call, &ops[0], nops, new struct tag( call->wrapped_call, &ops[0], nops, new struct tag(
callback, op_vector.release(), call), NULL); callback, op_vector.release(), call, info.This()), NULL);
if (error != GRPC_CALL_OK) { if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("startBatch failed", error)); return Nan::ThrowError(nanErrorWithCode("startBatch failed", error));
} }

View File

@ -58,6 +58,8 @@ v8::Local<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array);
bool CreateMetadataArray(v8::Local<v8::Object> metadata, bool CreateMetadataArray(v8::Local<v8::Object> metadata,
grpc_metadata_array *array); grpc_metadata_array *array);
void DestroyMetadataArray(grpc_metadata_array *array);
/* Wrapper class for grpc_call structs. */ /* Wrapper class for grpc_call structs. */
class Call : public Nan::ObjectWrap { class Call : public Nan::ObjectWrap {
public: public:
@ -76,6 +78,8 @@ class Call : public Nan::ObjectWrap {
Call(const Call &); Call(const Call &);
Call &operator=(const Call &); Call &operator=(const Call &);
void DestroyCall();
static NAN_METHOD(New); static NAN_METHOD(New);
static NAN_METHOD(StartBatch); static NAN_METHOD(StartBatch);
static NAN_METHOD(Cancel); static NAN_METHOD(Cancel);
@ -102,6 +106,7 @@ class Op {
virtual ~Op(); virtual ~Op();
v8::Local<v8::Value> GetOpType() const; v8::Local<v8::Value> GetOpType() const;
virtual bool IsFinalOp() = 0; virtual bool IsFinalOp() = 0;
virtual void OnComplete(bool success) = 0;
protected: protected:
virtual std::string GetTypeString() const = 0; virtual std::string GetTypeString() const = 0;
@ -109,20 +114,19 @@ class Op {
typedef std::vector<unique_ptr<Op>> OpVec; typedef std::vector<unique_ptr<Op>> OpVec;
struct tag { struct tag {
tag(Nan::Callback *callback, OpVec *ops, Call *call); tag(Nan::Callback *callback, OpVec *ops, Call *call,
v8::Local<v8::Value> call_value);
~tag(); ~tag();
Nan::Callback *callback; Nan::Callback *callback;
OpVec *ops; OpVec *ops;
Call *call; Call *call;
Nan::Persistent<v8::Value, Nan::CopyablePersistentTraits<v8::Value>>
call_persist;
}; };
v8::Local<v8::Value> GetTagNodeValue(void *tag);
Nan::Callback *GetTagCallback(void *tag);
void DestroyTag(void *tag); void DestroyTag(void *tag);
void CompleteTag(void *tag); void CompleteTag(void *tag, const char *error_message);
} // namespace node } // namespace node
} // namespace grpc } // namespace grpc

View File

@ -211,6 +211,7 @@ NAN_METHOD(PluginCallback) {
Utf8String details_utf8_str(info[1]); Utf8String details_utf8_str(info[1]);
char *details = *details_utf8_str; char *details = *details_utf8_str;
grpc_metadata_array array; grpc_metadata_array array;
grpc_metadata_array_init(&array);
Local<Object> callback_data = Nan::To<Object>(info[3]).ToLocalChecked(); Local<Object> callback_data = Nan::To<Object>(info[3]).ToLocalChecked();
if (!CreateMetadataArray(Nan::To<Object>(info[2]).ToLocalChecked(), if (!CreateMetadataArray(Nan::To<Object>(info[2]).ToLocalChecked(),
&array)){ &array)){
@ -226,6 +227,7 @@ NAN_METHOD(PluginCallback) {
Nan::New("user_data").ToLocalChecked() Nan::New("user_data").ToLocalChecked()
).ToLocalChecked().As<External>()->Value(); ).ToLocalChecked().As<External>()->Value();
cb(user_data, array.metadata, array.count, code, details); cb(user_data, array.metadata, array.count, code, details);
DestroyMetadataArray(&array);
} }
NAUV_WORK_CB(SendPluginCallback) { NAUV_WORK_CB(SendPluginCallback) {

View File

@ -280,7 +280,7 @@ NAN_METHOD(Channel::WatchConnectivityState) {
channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline), channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline),
GetCompletionQueue(), GetCompletionQueue(),
new struct tag(callback, new struct tag(callback,
ops.release(), NULL)); ops.release(), NULL, Nan::Null()));
CompletionQueueNext(); CompletionQueueNext();
} }

View File

@ -34,14 +34,14 @@
/* I don't like using #ifndef, but I don't see a better way to do this */ /* I don't like using #ifndef, but I don't see a better way to do this */
#ifndef GRPC_UV #ifndef GRPC_UV
#include <node.h>
#include <nan.h> #include <nan.h>
#include <node.h>
#include "call.h"
#include "completion_queue.h"
#include "grpc/grpc.h" #include "grpc/grpc.h"
#include "grpc/support/log.h" #include "grpc/support/log.h"
#include "grpc/support/time.h" #include "grpc/support/time.h"
#include "completion_queue.h"
#include "call.h"
namespace grpc { namespace grpc {
namespace node { namespace node {
@ -111,8 +111,8 @@ CompletionQueueAsyncWorker::CompletionQueueAsyncWorker()
CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
void CompletionQueueAsyncWorker::Execute() { void CompletionQueueAsyncWorker::Execute() {
result = result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME),
grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); NULL);
if (!result.success) { if (!result.success) {
SetErrorMessage("The async function encountered an error"); SetErrorMessage("The async function encountered an error");
} }
@ -141,16 +141,14 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
Nan::HandleScope scope; Nan::HandleScope scope;
current_threads = 0; current_threads = 0;
waiting_next_calls = 0; waiting_next_calls = 0;
queue = grpc_completion_queue_create(NULL); queue = grpc_completion_queue_create_for_next(NULL);
} }
void CompletionQueueAsyncWorker::HandleOKCallback() { void CompletionQueueAsyncWorker::HandleOKCallback() {
Nan::HandleScope scope; Nan::HandleScope scope;
current_threads -= 1; current_threads -= 1;
TryAddWorker(); TryAddWorker();
Nan::Callback *callback = GetTagCallback(result.tag); CompleteTag(result.tag, NULL);
Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)};
callback->Call(2, argv);
DestroyTag(result.tag); DestroyTag(result.tag);
} }
@ -159,10 +157,7 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() {
Nan::HandleScope scope; Nan::HandleScope scope;
current_threads -= 1; current_threads -= 1;
TryAddWorker(); TryAddWorker();
Nan::Callback *callback = GetTagCallback(result.tag); CompleteTag(result.tag, ErrorMessage());
Local<Value> argv[] = {Nan::Error(ErrorMessage())};
callback->Call(1, argv);
DestroyTag(result.tag); DestroyTag(result.tag);
} }
@ -173,9 +168,7 @@ grpc_completion_queue *GetCompletionQueue() {
return CompletionQueueAsyncWorker::GetQueue(); return CompletionQueueAsyncWorker::GetQueue();
} }
void CompletionQueueNext() { void CompletionQueueNext() { CompletionQueueAsyncWorker::Next(); }
CompletionQueueAsyncWorker::Next();
}
void CompletionQueueInit(Local<Object> exports) { void CompletionQueueInit(Local<Object> exports) {
CompletionQueueAsyncWorker::Init(exports); CompletionQueueAsyncWorker::Init(exports);

View File

@ -33,10 +33,10 @@
#ifdef GRPC_UV #ifdef GRPC_UV
#include <uv.h>
#include <node.h>
#include <v8.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <node.h>
#include <uv.h>
#include <v8.h>
#include "call.h" #include "call.h"
#include "completion_queue.h" #include "completion_queue.h"
@ -57,21 +57,17 @@ void drain_completion_queue(uv_prepare_t *handle) {
grpc_event event; grpc_event event;
(void)handle; (void)handle;
do { do {
event = grpc_completion_queue_next( event = grpc_completion_queue_next(queue, gpr_inf_past(GPR_CLOCK_MONOTONIC),
queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL); NULL);
if (event.type == GRPC_OP_COMPLETE) { if (event.type == GRPC_OP_COMPLETE) {
Nan::Callback *callback = grpc::node::GetTagCallback(event.tag); const char *error_message;
if (event.success) { if (event.success) {
Local<Value> argv[] = {Nan::Null(), error_message = NULL;
grpc::node::GetTagNodeValue(event.tag)};
callback->Call(2, argv);
} else { } else {
Local<Value> argv[] = {Nan::Error( error_message = "The async function encountered an error";
"The async function encountered an error")};
callback->Call(1, argv);
} }
grpc::node::CompleteTag(event.tag); CompleteTag(event.tag, error_message);
grpc::node::DestroyTag(event.tag); grpc::node::DestroyTag(event.tag);
pending_batches--; pending_batches--;
if (pending_batches == 0) { if (pending_batches == 0) {
@ -81,9 +77,7 @@ void drain_completion_queue(uv_prepare_t *handle) {
} while (event.type != GRPC_QUEUE_TIMEOUT); } while (event.type != GRPC_QUEUE_TIMEOUT);
} }
grpc_completion_queue *GetCompletionQueue() { grpc_completion_queue *GetCompletionQueue() { return queue; }
return queue;
}
void CompletionQueueNext() { void CompletionQueueNext() {
if (pending_batches == 0) { if (pending_batches == 0) {
@ -94,7 +88,7 @@ void CompletionQueueNext() {
} }
void CompletionQueueInit(Local<Object> exports) { void CompletionQueueInit(Local<Object> exports) {
queue = grpc_completion_queue_create(NULL); queue = grpc_completion_queue_create_for_next(NULL);
uv_prepare_init(uv_default_loop(), &prepare); uv_prepare_init(uv_default_loop(), &prepare);
pending_batches = 0; pending_batches = 0;
} }

View File

@ -286,8 +286,10 @@ NAN_METHOD(MetadataKeyIsLegal) {
"headerKeyIsLegal's argument must be a string"); "headerKeyIsLegal's argument must be a string");
} }
Local<String> key = Nan::To<String>(info[0]).ToLocalChecked(); Local<String> key = Nan::To<String>(info[0]).ToLocalChecked();
grpc_slice slice = CreateSliceFromString(key);
info.GetReturnValue().Set(static_cast<bool>( info.GetReturnValue().Set(static_cast<bool>(
grpc_header_key_is_legal(CreateSliceFromString(key)))); grpc_header_key_is_legal(slice)));
grpc_slice_unref(slice);
} }
NAN_METHOD(MetadataNonbinValueIsLegal) { NAN_METHOD(MetadataNonbinValueIsLegal) {
@ -296,8 +298,10 @@ NAN_METHOD(MetadataNonbinValueIsLegal) {
"metadataNonbinValueIsLegal's argument must be a string"); "metadataNonbinValueIsLegal's argument must be a string");
} }
Local<String> value = Nan::To<String>(info[0]).ToLocalChecked(); Local<String> value = Nan::To<String>(info[0]).ToLocalChecked();
grpc_slice slice = CreateSliceFromString(value);
info.GetReturnValue().Set(static_cast<bool>( info.GetReturnValue().Set(static_cast<bool>(
grpc_header_nonbin_value_is_legal(CreateSliceFromString(value)))); grpc_header_nonbin_value_is_legal(slice)));
grpc_slice_unref(slice);
} }
NAN_METHOD(MetadataKeyIsBinary) { NAN_METHOD(MetadataKeyIsBinary) {
@ -306,8 +310,10 @@ NAN_METHOD(MetadataKeyIsBinary) {
"metadataKeyIsLegal's argument must be a string"); "metadataKeyIsLegal's argument must be a string");
} }
Local<String> key = Nan::To<String>(info[0]).ToLocalChecked(); Local<String> key = Nan::To<String>(info[0]).ToLocalChecked();
grpc_slice slice = CreateSliceFromString(key);
info.GetReturnValue().Set(static_cast<bool>( info.GetReturnValue().Set(static_cast<bool>(
grpc_is_binary_header(CreateSliceFromString(key)))); grpc_is_binary_header(slice)));
grpc_slice_unref(slice);
} }
static grpc_ssl_roots_override_result get_ssl_roots_override( static grpc_ssl_roots_override_result get_ssl_roots_override(

View File

@ -117,6 +117,8 @@ class NewCallOp : public Op {
bool IsFinalOp() { bool IsFinalOp() {
return false; return false;
} }
void OnComplete(bool success) {
}
grpc_call *call; grpc_call *call;
grpc_call_details details; grpc_call_details details;
@ -126,6 +128,34 @@ class NewCallOp : public Op {
std::string GetTypeString() const { return "new_call"; } std::string GetTypeString() const { return "new_call"; }
}; };
class TryShutdownOp: public Op {
public:
TryShutdownOp(Server *server, Local<Value> server_value) : server(server) {
server_persist.Reset(server_value);
}
Local<Value> GetNodeValue() const {
EscapableHandleScope scope;
return scope.Escape(Nan::New(server_persist));
}
bool ParseOp(Local<Value> value, grpc_op *out) {
return true;
}
bool IsFinalOp() {
return false;
}
void OnComplete(bool success) {
if (success) {
server->DestroyWrappedServer();
}
}
protected:
std::string GetTypeString() const { return "try_shutdown"; }
private:
Server *server;
Nan::Persistent<v8::Value, Nan::CopyablePersistentTraits<v8::Value>>
server_persist;
};
void Server::Init(Local<Object> exports) { void Server::Init(Local<Object> exports) {
HandleScope scope; HandleScope scope;
Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
@ -147,6 +177,13 @@ bool Server::HasInstance(Local<Value> val) {
return Nan::New(fun_tpl)->HasInstance(val); return Nan::New(fun_tpl)->HasInstance(val);
} }
void Server::DestroyWrappedServer() {
if (this->wrapped_server != NULL) {
grpc_server_destroy(this->wrapped_server);
this->wrapped_server = NULL;
}
}
NAN_METHOD(Server::New) { NAN_METHOD(Server::New) {
/* If this is not a constructor call, make a constructor call and return /* If this is not a constructor call, make a constructor call and return
the result */ the result */
@ -193,7 +230,7 @@ NAN_METHOD(Server::RequestCall) {
GetCompletionQueue(), GetCompletionQueue(),
GetCompletionQueue(), GetCompletionQueue(),
new struct tag(new Callback(info[0].As<Function>()), ops.release(), new struct tag(new Callback(info[0].As<Function>()), ops.release(),
NULL)); NULL, Nan::Null()));
if (error != GRPC_CALL_OK) { if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("requestCall failed", error)); return Nan::ThrowError(nanErrorWithCode("requestCall failed", error));
} }
@ -242,11 +279,19 @@ NAN_METHOD(Server::TryShutdown) {
return Nan::ThrowTypeError("tryShutdown can only be called on a Server"); return Nan::ThrowTypeError("tryShutdown can only be called on a Server");
} }
Server *server = ObjectWrap::Unwrap<Server>(info.This()); Server *server = ObjectWrap::Unwrap<Server>(info.This());
if (server->wrapped_server == NULL) {
// Server is already shut down. Call callback immediately.
Nan::Callback callback(info[0].As<Function>());
callback.Call(0, {});
return;
}
TryShutdownOp *op = new TryShutdownOp(server, info.This());
unique_ptr<OpVec> ops(new OpVec()); unique_ptr<OpVec> ops(new OpVec());
ops->push_back(unique_ptr<Op>(op));
grpc_server_shutdown_and_notify( grpc_server_shutdown_and_notify(
server->wrapped_server, GetCompletionQueue(), server->wrapped_server, GetCompletionQueue(),
new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(), new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
NULL)); NULL, Nan::Null()));
CompletionQueueNext(); CompletionQueueNext();
} }

View File

@ -53,6 +53,8 @@ class Server : public Nan::ObjectWrap {
JavaScript constructor */ JavaScript constructor */
static bool HasInstance(v8::Local<v8::Value> val); static bool HasInstance(v8::Local<v8::Value> val);
void DestroyWrappedServer();
private: private:
explicit Server(grpc_server *server); explicit Server(grpc_server *server);
~Server(); ~Server();

View File

@ -35,8 +35,8 @@
#include "server.h" #include "server.h"
#include <node.h>
#include <nan.h> #include <nan.h>
#include <node.h>
#include "grpc/grpc.h" #include "grpc/grpc.h"
#include "grpc/support/time.h" #include "grpc/support/time.h"
@ -44,7 +44,7 @@ namespace grpc {
namespace node { namespace node {
Server::Server(grpc_server *server) : wrapped_server(server) { Server::Server(grpc_server *server) : wrapped_server(server) {
shutdown_queue = grpc_completion_queue_create(NULL); shutdown_queue = grpc_completion_queue_create_for_pluck(NULL);
grpc_server_register_non_listening_completion_queue(server, shutdown_queue, grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
NULL); NULL);
} }

View File

@ -67,7 +67,7 @@ class ServerShutdownOp : public Op {
} }
Local<Value> GetNodeValue() const { Local<Value> GetNodeValue() const {
return Nan::New<External>(reinterpret_cast<void *>(server)); return Nan::Null();
} }
bool ParseOp(Local<Value> value, grpc_op *out) { bool ParseOp(Local<Value> value, grpc_op *out) {
@ -76,6 +76,11 @@ class ServerShutdownOp : public Op {
bool IsFinalOp() { bool IsFinalOp() {
return false; return false;
} }
void OnComplete(bool success) {
/* Because cancel_all_calls was called, we assume that shutdown_and_notify
completes successfully */
grpc_server_destroy(server);
}
grpc_server *server; grpc_server *server;
@ -94,16 +99,10 @@ NAN_METHOD(ServerShutdownCallback) {
if (!info[0]->IsNull()) { if (!info[0]->IsNull()) {
return Nan::ThrowError("forceShutdown failed somehow"); return Nan::ThrowError("forceShutdown failed somehow");
} }
MaybeLocal<Object> maybe_result = Nan::To<Object>(info[1]);
Local<Object> result = maybe_result.ToLocalChecked();
Local<Value> server_val = Nan::Get(
result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked();
Local<External> server_extern = server_val.As<External>();
grpc_server *server = reinterpret_cast<grpc_server *>(server_extern->Value());
grpc_server_destroy(server);
} }
void Server::ShutdownServer() { void Server::ShutdownServer() {
Nan::HandleScope scope;
if (this->wrapped_server != NULL) { if (this->wrapped_server != NULL) {
if (shutdown_callback == NULL) { if (shutdown_callback == NULL) {
Local<FunctionTemplate>callback_tpl = Local<FunctionTemplate>callback_tpl =
@ -118,7 +117,8 @@ void Server::ShutdownServer() {
grpc_server_shutdown_and_notify( grpc_server_shutdown_and_notify(
this->wrapped_server, GetCompletionQueue(), this->wrapped_server, GetCompletionQueue(),
new struct tag(new Callback(**shutdown_callback), ops.release(), NULL)); new struct tag(new Callback(**shutdown_callback), ops.release(), NULL,
Nan::Null()));
grpc_server_cancel_all_calls(this->wrapped_server); grpc_server_cancel_all_calls(this->wrapped_server);
CompletionQueueNext(); CompletionQueueNext();
this->wrapped_server = NULL; this->wrapped_server = NULL;

View File

@ -1,6 +1,6 @@
{ {
"name": "grpc-health-check", "name": "grpc-health-check",
"version": "1.3.0-dev", "version": "1.4.0-dev",
"author": "Google Inc.", "author": "Google Inc.",
"description": "Health check service for use with gRPC", "description": "Health check service for use with gRPC",
"repository": { "repository": {
@ -15,7 +15,7 @@
} }
], ],
"dependencies": { "dependencies": {
"grpc": "^1.3.0-dev", "grpc": "^1.4.0-dev",
"lodash": "^3.9.3", "lodash": "^3.9.3",
"google-protobuf": "^3.0.0" "google-protobuf": "^3.0.0"
}, },

View File

@ -1,6 +1,6 @@
{ {
"name": "grpc-tools", "name": "grpc-tools",
"version": "1.3.0-dev", "version": "1.4.0-dev",
"author": "Google Inc.", "author": "Google Inc.",
"description": "Tools for developing with gRPC on Node.js", "description": "Tools for developing with gRPC on Node.js",
"homepage": "http://www.grpc.io/", "homepage": "http://www.grpc.io/",