From d11683df615eabdaa4e6a91a42736e5a71678020 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 3 Mar 2017 17:48:29 -0800 Subject: [PATCH 01/15] Node: Completion queue API changes --- ext/completion_queue_threadpool.cc | 19 +++++++++---------- ext/completion_queue_uv.cc | 23 +++++++++++------------ ext/server_generic.cc | 5 +++-- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/ext/completion_queue_threadpool.cc b/ext/completion_queue_threadpool.cc index 1917074d..b5227bad 100644 --- a/ext/completion_queue_threadpool.cc +++ b/ext/completion_queue_threadpool.cc @@ -34,14 +34,14 @@ /* I don't like using #ifndef, but I don't see a better way to do this */ #ifndef GRPC_UV -#include #include +#include +#include "call.h" +#include "completion_queue.h" #include "grpc/grpc.h" #include "grpc/support/log.h" #include "grpc/support/time.h" -#include "completion_queue.h" -#include "call.h" namespace grpc { namespace node { @@ -111,8 +111,8 @@ CompletionQueueAsyncWorker::CompletionQueueAsyncWorker() CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} void CompletionQueueAsyncWorker::Execute() { - result = - grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), + NULL); if (!result.success) { SetErrorMessage("The async function encountered an error"); } @@ -141,7 +141,8 @@ void CompletionQueueAsyncWorker::Init(Local exports) { Nan::HandleScope scope; current_threads = 0; waiting_next_calls = 0; - queue = grpc_completion_queue_create(NULL); + queue = + grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL); } void CompletionQueueAsyncWorker::HandleOKCallback() { @@ -173,9 +174,7 @@ grpc_completion_queue *GetCompletionQueue() { return CompletionQueueAsyncWorker::GetQueue(); } -void CompletionQueueNext() { - CompletionQueueAsyncWorker::Next(); -} +void CompletionQueueNext() { CompletionQueueAsyncWorker::Next(); } void CompletionQueueInit(Local exports) { CompletionQueueAsyncWorker::Init(exports); @@ -184,4 +183,4 @@ void CompletionQueueInit(Local exports) { } // namespace node } // namespace grpc -#endif /* GRPC_UV */ +#endif /* GRPC_UV */ diff --git a/ext/completion_queue_uv.cc b/ext/completion_queue_uv.cc index 615973a6..9c1f093a 100644 --- a/ext/completion_queue_uv.cc +++ b/ext/completion_queue_uv.cc @@ -33,10 +33,10 @@ #ifdef GRPC_UV -#include -#include -#include #include +#include +#include +#include #include "call.h" #include "completion_queue.h" @@ -57,18 +57,18 @@ void drain_completion_queue(uv_prepare_t *handle) { grpc_event event; (void)handle; do { - event = grpc_completion_queue_next( - queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL); + event = grpc_completion_queue_next(queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), + NULL); if (event.type == GRPC_OP_COMPLETE) { Nan::Callback *callback = grpc::node::GetTagCallback(event.tag); if (event.success) { Local argv[] = {Nan::Null(), - grpc::node::GetTagNodeValue(event.tag)}; + grpc::node::GetTagNodeValue(event.tag)}; callback->Call(2, argv); } else { - Local argv[] = {Nan::Error( - "The async function encountered an error")}; + Local argv[] = { + Nan::Error("The async function encountered an error")}; callback->Call(1, argv); } grpc::node::CompleteTag(event.tag); @@ -81,9 +81,7 @@ void drain_completion_queue(uv_prepare_t *handle) { } while (event.type != GRPC_QUEUE_TIMEOUT); } -grpc_completion_queue *GetCompletionQueue() { - return queue; -} +grpc_completion_queue *GetCompletionQueue() { return queue; } void CompletionQueueNext() { if (pending_batches == 0) { @@ -94,7 +92,8 @@ void CompletionQueueNext() { } void CompletionQueueInit(Local exports) { - queue = grpc_completion_queue_create(NULL); + queue = + grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL); uv_prepare_init(uv_default_loop(), &prepare); pending_batches = 0; } diff --git a/ext/server_generic.cc b/ext/server_generic.cc index 0cf20f75..787605ae 100644 --- a/ext/server_generic.cc +++ b/ext/server_generic.cc @@ -35,8 +35,8 @@ #include "server.h" -#include #include +#include #include "grpc/grpc.h" #include "grpc/support/time.h" @@ -44,7 +44,8 @@ namespace grpc { namespace node { Server::Server(grpc_server *server) : wrapped_server(server) { - shutdown_queue = grpc_completion_queue_create(NULL); + shutdown_queue = grpc_completion_queue_create(GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING, NULL); grpc_server_register_non_listening_completion_queue(server, shutdown_queue, NULL); } From d18eb949eefc7fc86fb6eb130a6870eb513c0401 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 22 Mar 2017 12:35:09 -0700 Subject: [PATCH 02/15] Node changes --- ext/completion_queue_threadpool.cc | 3 +-- ext/completion_queue_uv.cc | 3 +-- ext/server_generic.cc | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/ext/completion_queue_threadpool.cc b/ext/completion_queue_threadpool.cc index b5227bad..0b8bc019 100644 --- a/ext/completion_queue_threadpool.cc +++ b/ext/completion_queue_threadpool.cc @@ -141,8 +141,7 @@ void CompletionQueueAsyncWorker::Init(Local exports) { Nan::HandleScope scope; current_threads = 0; waiting_next_calls = 0; - queue = - grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL); + queue = grpc_completion_queue_create_for_next(NULL); } void CompletionQueueAsyncWorker::HandleOKCallback() { diff --git a/ext/completion_queue_uv.cc b/ext/completion_queue_uv.cc index 9c1f093a..72900501 100644 --- a/ext/completion_queue_uv.cc +++ b/ext/completion_queue_uv.cc @@ -92,8 +92,7 @@ void CompletionQueueNext() { } void CompletionQueueInit(Local exports) { - queue = - grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL); + queue = grpc_completion_queue_create_for_next(NULL); uv_prepare_init(uv_default_loop(), &prepare); pending_batches = 0; } diff --git a/ext/server_generic.cc b/ext/server_generic.cc index 787605ae..24573bd5 100644 --- a/ext/server_generic.cc +++ b/ext/server_generic.cc @@ -44,8 +44,7 @@ namespace grpc { namespace node { Server::Server(grpc_server *server) : wrapped_server(server) { - shutdown_queue = grpc_completion_queue_create(GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING, NULL); + shutdown_queue = grpc_completion_queue_create_for_pluck(NULL); grpc_server_register_non_listening_completion_queue(server, shutdown_queue, NULL); } From 54e008cbe17d44618da458633599335908c72df0 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 3 Apr 2017 15:31:53 -0700 Subject: [PATCH 03/15] Properly unref some slices in Node glue code --- ext/call.cc | 17 +++++++++++++---- ext/node_grpc.cc | 12 +++++++++--- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index 244546d3..5c311158 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -574,6 +574,14 @@ void Call::CompleteBatch(bool is_final_op) { } NAN_METHOD(Call::New) { + /* Arguments: + * 0: Channel to make the call on + * 1: Method + * 2: Deadline + * 3: host + * 4: parent Call + * 5: propagation flags + */ if (info.IsConstructCall()) { Call *call; if (info[0]->IsExternal()) { @@ -618,25 +626,26 @@ NAN_METHOD(Call::New) { double deadline = Nan::To(info[2]).FromJust(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); grpc_call *wrapped_call; + grpc_slice method = CreateSliceFromString( + Nan::To(info[1]).ToLocalChecked()); if (info[3]->IsString()) { grpc_slice *host = new grpc_slice; *host = CreateSliceFromString( Nan::To(info[3]).ToLocalChecked()); wrapped_call = grpc_channel_create_call( wrapped_channel, parent_call, propagate_flags, - GetCompletionQueue(), CreateSliceFromString( - Nan::To(info[1]).ToLocalChecked()), + GetCompletionQueue(), method, host, MillisecondsToTimespec(deadline), NULL); delete host; } else if (info[3]->IsUndefined() || info[3]->IsNull()) { wrapped_call = grpc_channel_create_call( wrapped_channel, parent_call, propagate_flags, - GetCompletionQueue(), CreateSliceFromString( - Nan::To(info[1]).ToLocalChecked()), + GetCompletionQueue(), method, NULL, MillisecondsToTimespec(deadline), NULL); } else { return Nan::ThrowTypeError("Call's fourth argument must be a string"); } + grpc_slice_unref(method); call = new Call(wrapped_call); Nan::Set(info.This(), Nan::New("channel_").ToLocalChecked(), channel_object); diff --git a/ext/node_grpc.cc b/ext/node_grpc.cc index 95e273f8..122e5e63 100644 --- a/ext/node_grpc.cc +++ b/ext/node_grpc.cc @@ -286,8 +286,10 @@ NAN_METHOD(MetadataKeyIsLegal) { "headerKeyIsLegal's argument must be a string"); } Local key = Nan::To(info[0]).ToLocalChecked(); + grpc_slice slice = CreateSliceFromString(key); info.GetReturnValue().Set(static_cast( - grpc_header_key_is_legal(CreateSliceFromString(key)))); + grpc_header_key_is_legal(slice))); + grpc_slice_unref(slice); } NAN_METHOD(MetadataNonbinValueIsLegal) { @@ -296,8 +298,10 @@ NAN_METHOD(MetadataNonbinValueIsLegal) { "metadataNonbinValueIsLegal's argument must be a string"); } Local value = Nan::To(info[0]).ToLocalChecked(); + grpc_slice slice = CreateSliceFromString(value); info.GetReturnValue().Set(static_cast( - grpc_header_nonbin_value_is_legal(CreateSliceFromString(value)))); + grpc_header_nonbin_value_is_legal(slice))); + grpc_slice_unref(slice); } NAN_METHOD(MetadataKeyIsBinary) { @@ -306,8 +310,10 @@ NAN_METHOD(MetadataKeyIsBinary) { "metadataKeyIsLegal's argument must be a string"); } Local key = Nan::To(info[0]).ToLocalChecked(); + grpc_slice slice = CreateSliceFromString(key); info.GetReturnValue().Set(static_cast( - grpc_is_binary_header(CreateSliceFromString(key)))); + grpc_is_binary_header(slice))); + grpc_slice_unref(slice); } static grpc_ssl_roots_override_result get_ssl_roots_override( From 3b58459472a473da80e13182a22bd153056b3939 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 4 Apr 2017 13:43:49 -0700 Subject: [PATCH 04/15] Fix call destruction bug --- ext/call.cc | 10 +++++++--- ext/call.h | 5 ++++- ext/channel.cc | 2 +- ext/server.cc | 4 ++-- ext/server_uv.cc | 3 ++- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index 244546d3..62f0130d 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -466,8 +466,10 @@ class ServerCloseResponseOp : public Op { int cancelled; }; -tag::tag(Callback *callback, OpVec *ops, Call *call) : +tag::tag(Callback *callback, OpVec *ops, Call *call, Local call_value) : callback(callback), ops(ops), call(call){ + HandleScope scope; + call_persist.Reset(call_value); } tag::~tag() { @@ -521,6 +523,7 @@ Call::Call(grpc_call *call) : wrapped_call(call), Call::~Call() { if (wrapped_call != NULL) { grpc_call_destroy(wrapped_call); + wrapped_call = NULL; } } @@ -567,7 +570,8 @@ void Call::CompleteBatch(bool is_final_op) { this->has_final_op_completed = true; } this->pending_batches--; - if (this->has_final_op_completed && this->pending_batches == 0) { + if (this->has_final_op_completed && this->pending_batches == 0 && + this->wrapped_call != NULL) { grpc_call_destroy(this->wrapped_call); this->wrapped_call = NULL; } @@ -721,7 +725,7 @@ NAN_METHOD(Call::StartBatch) { Callback *callback = new Callback(callback_func); grpc_call_error error = grpc_call_start_batch( call->wrapped_call, &ops[0], nops, new struct tag( - callback, op_vector.release(), call), NULL); + callback, op_vector.release(), call, info.This()), NULL); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("startBatch failed", error)); } diff --git a/ext/call.h b/ext/call.h index cffff00f..cceec9c4 100644 --- a/ext/call.h +++ b/ext/call.h @@ -109,11 +109,14 @@ class Op { typedef std::vector> OpVec; struct tag { - tag(Nan::Callback *callback, OpVec *ops, Call *call); + tag(Nan::Callback *callback, OpVec *ops, Call *call, + v8::Local call_value); ~tag(); Nan::Callback *callback; OpVec *ops; Call *call; + Nan::Persistent> + call_persist; }; v8::Local GetTagNodeValue(void *tag); diff --git a/ext/channel.cc b/ext/channel.cc index c795ff7f..1263cc0d 100644 --- a/ext/channel.cc +++ b/ext/channel.cc @@ -280,7 +280,7 @@ NAN_METHOD(Channel::WatchConnectivityState) { channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline), GetCompletionQueue(), new struct tag(callback, - ops.release(), NULL)); + ops.release(), NULL, Nan::Null())); CompletionQueueNext(); } diff --git a/ext/server.cc b/ext/server.cc index ccb55aa5..f0920c84 100644 --- a/ext/server.cc +++ b/ext/server.cc @@ -193,7 +193,7 @@ NAN_METHOD(Server::RequestCall) { GetCompletionQueue(), GetCompletionQueue(), new struct tag(new Callback(info[0].As()), ops.release(), - NULL)); + NULL, Nan::Null())); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("requestCall failed", error)); } @@ -246,7 +246,7 @@ NAN_METHOD(Server::TryShutdown) { grpc_server_shutdown_and_notify( server->wrapped_server, GetCompletionQueue(), new struct tag(new Nan::Callback(info[0].As()), ops.release(), - NULL)); + NULL, Nan::Null())); CompletionQueueNext(); } diff --git a/ext/server_uv.cc b/ext/server_uv.cc index c5e5ca9f..82e7589f 100644 --- a/ext/server_uv.cc +++ b/ext/server_uv.cc @@ -118,7 +118,8 @@ void Server::ShutdownServer() { grpc_server_shutdown_and_notify( this->wrapped_server, GetCompletionQueue(), - new struct tag(new Callback(**shutdown_callback), ops.release(), NULL)); + new struct tag(new Callback(**shutdown_callback), ops.release(), NULL, + Nan::Null())); grpc_server_cancel_all_calls(this->wrapped_server); CompletionQueueNext(); this->wrapped_server = NULL; From 90bb8a9e2d9d248981ec72fb22723e7567c115c9 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 6 Apr 2017 10:37:44 -0700 Subject: [PATCH 05/15] Node: fix leak of sent metadata --- ext/call.cc | 48 ++++++++++++++++++++++++++++++----------- ext/call.h | 2 ++ ext/call_credentials.cc | 2 ++ 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index 5c311158..c77d6a33 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -99,7 +99,6 @@ Local nanErrorWithCode(const char *msg, grpc_call_error code) { bool CreateMetadataArray(Local metadata, grpc_metadata_array *array) { HandleScope scope; - grpc_metadata_array_init(array); Local keys = Nan::GetOwnPropertyNames(metadata).ToLocalChecked(); for (unsigned int i = 0; i < keys->Length(); i++) { Local current_key = Nan::To( @@ -111,18 +110,20 @@ bool CreateMetadataArray(Local metadata, grpc_metadata_array *array) { array->capacity += Local::Cast(value_array)->Length(); } array->metadata = reinterpret_cast( - gpr_malloc(array->capacity * sizeof(grpc_metadata))); + gpr_zalloc(array->capacity * sizeof(grpc_metadata))); for (unsigned int i = 0; i < keys->Length(); i++) { Local current_key(Nan::To(keys->Get(i)).ToLocalChecked()); Local values = Local::Cast( Nan::Get(metadata, current_key).ToLocalChecked()); - grpc_slice key_slice = grpc_slice_intern(CreateSliceFromString(current_key)); + grpc_slice key_slice = CreateSliceFromString(current_key); + grpc_slice key_intern_slice = grpc_slice_intern(key_slice); + grpc_slice_unref(key_slice); for (unsigned int j = 0; j < values->Length(); j++) { Local value = Nan::Get(values, j).ToLocalChecked(); grpc_metadata *current = &array->metadata[array->count]; - current->key = key_slice; + current->key = key_intern_slice; // Only allow binary headers for "-bin" keys - if (grpc_is_binary_header(key_slice)) { + if (grpc_is_binary_header(key_intern_slice)) { if (::node::Buffer::HasInstance(value)) { current->value = CreateSliceFromBuffer(value); } else { @@ -142,6 +143,14 @@ bool CreateMetadataArray(Local metadata, grpc_metadata_array *array) { return true; } +void DestroyMetadataArray(grpc_metadata_array *array) { + for (size_t i = 0; i < array->count; i++) { + // Don't unref keys because they are interned + grpc_slice_unref(array->metadata[i].value); + } + grpc_metadata_array_destroy(array); +} + Local ParseMetadata(const grpc_metadata_array *metadata_array) { EscapableHandleScope scope; grpc_metadata *metadata_elements = metadata_array->metadata; @@ -179,6 +188,12 @@ Op::~Op() { class SendMetadataOp : public Op { public: + SendMetadataOp() { + grpc_metadata_array_init(&send_metadata); + } + ~SendMetadataOp() { + DestroyMetadataArray(&send_metadata); + } Local GetNodeValue() const { EscapableHandleScope scope; return scope.Escape(Nan::True()); @@ -187,17 +202,16 @@ class SendMetadataOp : public Op { if (!value->IsObject()) { return false; } - grpc_metadata_array array; MaybeLocal maybe_metadata = Nan::To(value); if (maybe_metadata.IsEmpty()) { return false; } if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(), - &array)) { + &send_metadata)) { return false; } - out->data.send_initial_metadata.count = array.count; - out->data.send_initial_metadata.metadata = array.metadata; + out->data.send_initial_metadata.count = send_metadata.count; + out->data.send_initial_metadata.metadata = send_metadata.metadata; return true; } bool IsFinalOp() { @@ -207,6 +221,8 @@ class SendMetadataOp : public Op { std::string GetTypeString() const { return "send_metadata"; } + private: + grpc_metadata_array send_metadata; }; class SendMessageOp : public Op { @@ -272,8 +288,12 @@ class SendClientCloseOp : public Op { class SendServerStatusOp : public Op { public: + SendServerStatusOp() { + grpc_metadata_array_init(&status_metadata); + } ~SendServerStatusOp() { grpc_slice_unref(details); + DestroyMetadataArray(&status_metadata); } Local GetNodeValue() const { EscapableHandleScope scope; @@ -313,12 +333,13 @@ class SendServerStatusOp : public Op { } Local details = Nan::To( maybe_details.ToLocalChecked()).ToLocalChecked(); - grpc_metadata_array array; - if (!CreateMetadataArray(metadata, &array)) { + if (!CreateMetadataArray(metadata, &status_metadata)) { return false; } - out->data.send_status_from_server.trailing_metadata_count = array.count; - out->data.send_status_from_server.trailing_metadata = array.metadata; + out->data.send_status_from_server.trailing_metadata_count = + status_metadata.count; + out->data.send_status_from_server.trailing_metadata = + status_metadata.metadata; out->data.send_status_from_server.status = static_cast(code); this->details = CreateSliceFromString(details); @@ -335,6 +356,7 @@ class SendServerStatusOp : public Op { private: grpc_slice details; + grpc_metadata_array status_metadata; }; class GetMetadataOp : public Op { diff --git a/ext/call.h b/ext/call.h index cffff00f..fe2abab9 100644 --- a/ext/call.h +++ b/ext/call.h @@ -58,6 +58,8 @@ v8::Local ParseMetadata(const grpc_metadata_array *metadata_array); bool CreateMetadataArray(v8::Local metadata, grpc_metadata_array *array); +void DestroyMetadataArray(grpc_metadata_array *array); + /* Wrapper class for grpc_call structs. */ class Call : public Nan::ObjectWrap { public: diff --git a/ext/call_credentials.cc b/ext/call_credentials.cc index afcc3631..5bd4bdcd 100644 --- a/ext/call_credentials.cc +++ b/ext/call_credentials.cc @@ -211,6 +211,7 @@ NAN_METHOD(PluginCallback) { Utf8String details_utf8_str(info[1]); char *details = *details_utf8_str; grpc_metadata_array array; + grpc_metadata_array_init(&array); Local callback_data = Nan::To(info[3]).ToLocalChecked(); if (!CreateMetadataArray(Nan::To(info[2]).ToLocalChecked(), &array)){ @@ -226,6 +227,7 @@ NAN_METHOD(PluginCallback) { Nan::New("user_data").ToLocalChecked() ).ToLocalChecked().As()->Value(); cb(user_data, array.metadata, array.count, code, details); + DestroyMetadataArray(&array); } NAUV_WORK_CB(SendPluginCallback) { From 60a0ed4903b2604aae01b1cb67054e798521ac48 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 6 Apr 2017 13:54:37 -0700 Subject: [PATCH 06/15] Node: consolidate call destruction logic --- ext/call.cc | 18 ++++++++++-------- ext/call.h | 2 ++ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index 62f0130d..769f744b 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -515,16 +515,20 @@ void DestroyTag(void *tag) { delete tag_struct; } +void Call::DestroyCall() { + if (this->wrapped_call != NULL) { + grpc_call_destroy(this->wrapped_call); + this->wrapped_call = NULL; + } +} + Call::Call(grpc_call *call) : wrapped_call(call), pending_batches(0), has_final_op_completed(false) { } Call::~Call() { - if (wrapped_call != NULL) { - grpc_call_destroy(wrapped_call); - wrapped_call = NULL; - } + DestroyCall(); } void Call::Init(Local exports) { @@ -570,10 +574,8 @@ void Call::CompleteBatch(bool is_final_op) { this->has_final_op_completed = true; } this->pending_batches--; - if (this->has_final_op_completed && this->pending_batches == 0 && - this->wrapped_call != NULL) { - grpc_call_destroy(this->wrapped_call); - this->wrapped_call = NULL; + if (this->has_final_op_completed && this->pending_batches == 0) { + this->DestroyCall(); } } diff --git a/ext/call.h b/ext/call.h index cceec9c4..7fd03ca1 100644 --- a/ext/call.h +++ b/ext/call.h @@ -76,6 +76,8 @@ class Call : public Nan::ObjectWrap { Call(const Call &); Call &operator=(const Call &); + void DestroyCall(); + static NAN_METHOD(New); static NAN_METHOD(StartBatch); static NAN_METHOD(Cancel); From e367327fe4437d69133578da42b5bb5c81d685c6 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 7 Apr 2017 10:41:21 -0700 Subject: [PATCH 07/15] Bump version to 1.2.3 --- health_check/package.json | 4 ++-- tools/package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/health_check/package.json b/health_check/package.json index 61bb83f2..5ad59e27 100644 --- a/health_check/package.json +++ b/health_check/package.json @@ -1,6 +1,6 @@ { "name": "grpc-health-check", - "version": "1.2.2", + "version": "1.2.3", "author": "Google Inc.", "description": "Health check service for use with gRPC", "repository": { @@ -15,7 +15,7 @@ } ], "dependencies": { - "grpc": "^1.2.2", + "grpc": "^1.2.3", "lodash": "^3.9.3", "google-protobuf": "^3.0.0" }, diff --git a/tools/package.json b/tools/package.json index 460f2fc8..e06775b5 100644 --- a/tools/package.json +++ b/tools/package.json @@ -1,6 +1,6 @@ { "name": "grpc-tools", - "version": "1.2.2", + "version": "1.2.3", "author": "Google Inc.", "description": "Tools for developing with gRPC on Node.js", "homepage": "http://www.grpc.io/", From 25d8d5b60996ea0f66d1e8cd4dcb3e2f0fb72dc0 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 10 Apr 2017 14:37:43 -0700 Subject: [PATCH 08/15] Refactor tag completion handling into one function --- ext/call.cc | 32 ++++++++++++++---------------- ext/call.h | 6 +----- ext/completion_queue_threadpool.cc | 9 ++------- ext/completion_queue_uv.cc | 12 ++++------- 4 files changed, 22 insertions(+), 37 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index 5d573110..8d5e2ced 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -499,25 +499,23 @@ tag::~tag() { delete ops; } -Local GetTagNodeValue(void *tag) { - EscapableHandleScope scope; +void CompleteTag(void *tag, const char *error_message) { + HandleScope scope; struct tag *tag_struct = reinterpret_cast(tag); - Local tag_obj = Nan::New(); - for (vector >::iterator it = tag_struct->ops->begin(); - it != tag_struct->ops->end(); ++it) { - Op *op_ptr = it->get(); - Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue()); + Callback *callback = tag_struct->callback; + if (error_message == NULL) { + Local tag_obj = Nan::New(); + for (vector >::iterator it = tag_struct->ops->begin(); + it != tag_struct->ops->end(); ++it) { + Op *op_ptr = it->get(); + Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue()); + } + Local argv[] = {Nan::Null(), tag_obj}; + callback->Call(2, argv); + } else { + Local argv[] = {Nan::Error(error_message)}; + callback->Call(1, argv); } - return scope.Escape(tag_obj); -} - -Callback *GetTagCallback(void *tag) { - struct tag *tag_struct = reinterpret_cast(tag); - return tag_struct->callback; -} - -void CompleteTag(void *tag) { - struct tag *tag_struct = reinterpret_cast(tag); bool is_final_op = false; if (tag_struct->call == NULL) { return; diff --git a/ext/call.h b/ext/call.h index 53a5e4ab..4316e9ed 100644 --- a/ext/call.h +++ b/ext/call.h @@ -123,13 +123,9 @@ struct tag { call_persist; }; -v8::Local GetTagNodeValue(void *tag); - -Nan::Callback *GetTagCallback(void *tag); - void DestroyTag(void *tag); -void CompleteTag(void *tag); +void CompleteTag(void *tag, const char *error_message); } // namespace node } // namespace grpc diff --git a/ext/completion_queue_threadpool.cc b/ext/completion_queue_threadpool.cc index 1917074d..7b1bdda0 100644 --- a/ext/completion_queue_threadpool.cc +++ b/ext/completion_queue_threadpool.cc @@ -148,9 +148,7 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { Nan::HandleScope scope; current_threads -= 1; TryAddWorker(); - Nan::Callback *callback = GetTagCallback(result.tag); - Local argv[] = {Nan::Null(), GetTagNodeValue(result.tag)}; - callback->Call(2, argv); + CompleteTag(result.tag, NULL); DestroyTag(result.tag); } @@ -159,10 +157,7 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() { Nan::HandleScope scope; current_threads -= 1; TryAddWorker(); - Nan::Callback *callback = GetTagCallback(result.tag); - Local argv[] = {Nan::Error(ErrorMessage())}; - - callback->Call(1, argv); + CompleteTag(result.tag, ErrorMessage()); DestroyTag(result.tag); } diff --git a/ext/completion_queue_uv.cc b/ext/completion_queue_uv.cc index 615973a6..0f6f7da4 100644 --- a/ext/completion_queue_uv.cc +++ b/ext/completion_queue_uv.cc @@ -61,17 +61,13 @@ void drain_completion_queue(uv_prepare_t *handle) { queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL); if (event.type == GRPC_OP_COMPLETE) { - Nan::Callback *callback = grpc::node::GetTagCallback(event.tag); + const char *error_message; if (event.success) { - Local argv[] = {Nan::Null(), - grpc::node::GetTagNodeValue(event.tag)}; - callback->Call(2, argv); + error_message = NULL; } else { - Local argv[] = {Nan::Error( - "The async function encountered an error")}; - callback->Call(1, argv); + error_message = "The async function encountered an error"; } - grpc::node::CompleteTag(event.tag); + CompleteTag(event.tag, error_message); grpc::node::DestroyTag(event.tag); pending_batches--; if (pending_batches == 0) { From da129b2b4a6cb155dc080890ab2b4a543635e3a6 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 10 Apr 2017 15:43:09 -0700 Subject: [PATCH 09/15] Add native tag completion callbacks, dispose of server after tryShutdown succeeds --- ext/call.cc | 23 ++++++++++++++++++++--- ext/call.h | 1 + ext/server.cc | 37 +++++++++++++++++++++++++++++++++++++ ext/server.h | 2 ++ ext/server_uv.cc | 3 +++ 5 files changed, 63 insertions(+), 3 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index 8d5e2ced..bb11cfb8 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -217,6 +217,8 @@ class SendMetadataOp : public Op { bool IsFinalOp() { return false; } + void OnComplete() { + } protected: std::string GetTypeString() const { return "send_metadata"; @@ -260,6 +262,8 @@ class SendMessageOp : public Op { bool IsFinalOp() { return false; } + void OnComplete() { + } protected: std::string GetTypeString() const { return "send_message"; @@ -280,6 +284,8 @@ class SendClientCloseOp : public Op { bool IsFinalOp() { return false; } + void OnComplete() { + } protected: std::string GetTypeString() const { return "client_close"; @@ -349,6 +355,8 @@ class SendServerStatusOp : public Op { bool IsFinalOp() { return true; } + void OnComplete() { + } protected: std::string GetTypeString() const { return "send_status"; @@ -381,6 +389,8 @@ class GetMetadataOp : public Op { bool IsFinalOp() { return false; } + void OnComplete() { + } protected: std::string GetTypeString() const { @@ -413,6 +423,8 @@ class ReadMessageOp : public Op { bool IsFinalOp() { return false; } + void OnComplete() { + } protected: std::string GetTypeString() const { @@ -454,6 +466,8 @@ class ClientStatusOp : public Op { bool IsFinalOp() { return true; } + void OnComplete() { + } protected: std::string GetTypeString() const { return "status"; @@ -478,6 +492,8 @@ class ServerCloseResponseOp : public Op { bool IsFinalOp() { return false; } + void OnComplete() { + } protected: std::string GetTypeString() const { @@ -517,16 +533,17 @@ void CompleteTag(void *tag, const char *error_message) { callback->Call(1, argv); } bool is_final_op = false; - if (tag_struct->call == NULL) { - return; - } for (vector >::iterator it = tag_struct->ops->begin(); it != tag_struct->ops->end(); ++it) { Op *op_ptr = it->get(); + op_ptr->OnComplete(); if (op_ptr->IsFinalOp()) { is_final_op = true; } } + if (tag_struct->call == NULL) { + return; + } tag_struct->call->CompleteBatch(is_final_op); } diff --git a/ext/call.h b/ext/call.h index 4316e9ed..b38f5006 100644 --- a/ext/call.h +++ b/ext/call.h @@ -106,6 +106,7 @@ class Op { virtual ~Op(); v8::Local GetOpType() const; virtual bool IsFinalOp() = 0; + virtual void OnComplete() = 0; protected: virtual std::string GetTypeString() const = 0; diff --git a/ext/server.cc b/ext/server.cc index f0920c84..22c89c58 100644 --- a/ext/server.cc +++ b/ext/server.cc @@ -117,6 +117,8 @@ class NewCallOp : public Op { bool IsFinalOp() { return false; } + void OnComplete() { + } grpc_call *call; grpc_call_details details; @@ -126,6 +128,32 @@ class NewCallOp : public Op { std::string GetTypeString() const { return "new_call"; } }; +class TryShutdownOp: public Op { + public: + TryShutdownOp(Server *server, Local server_value) : server(server) { + server_persist.Reset(server_value); + } + Local GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(Nan::New(server_persist)); + } + bool ParseOp(Local value, grpc_op *out) { + return true; + } + bool IsFinalOp() { + return false; + } + void OnComplete() { + server->DestroyWrappedServer(); + } + protected: + std::string GetTypeString() const { return "try_shutdown"; } + private: + Server *server; + Nan::Persistent> + server_persist; +}; + void Server::Init(Local exports) { HandleScope scope; Local tpl = Nan::New(New); @@ -147,6 +175,13 @@ bool Server::HasInstance(Local val) { return Nan::New(fun_tpl)->HasInstance(val); } +void Server::DestroyWrappedServer() { + if (this->wrapped_server != NULL) { + grpc_server_destroy(this->wrapped_server); + this->wrapped_server = NULL; + } +} + NAN_METHOD(Server::New) { /* If this is not a constructor call, make a constructor call and return the result */ @@ -242,7 +277,9 @@ NAN_METHOD(Server::TryShutdown) { return Nan::ThrowTypeError("tryShutdown can only be called on a Server"); } Server *server = ObjectWrap::Unwrap(info.This()); + TryShutdownOp *op = new TryShutdownOp(server, info.This()); unique_ptr ops(new OpVec()); + ops->push_back(unique_ptr(op)); grpc_server_shutdown_and_notify( server->wrapped_server, GetCompletionQueue(), new struct tag(new Nan::Callback(info[0].As()), ops.release(), diff --git a/ext/server.h b/ext/server.h index ab5fc210..c0f2e865 100644 --- a/ext/server.h +++ b/ext/server.h @@ -53,6 +53,8 @@ class Server : public Nan::ObjectWrap { JavaScript constructor */ static bool HasInstance(v8::Local val); + void DestroyWrappedServer(); + private: explicit Server(grpc_server *server); ~Server(); diff --git a/ext/server_uv.cc b/ext/server_uv.cc index 82e7589f..bf402e19 100644 --- a/ext/server_uv.cc +++ b/ext/server_uv.cc @@ -76,6 +76,8 @@ class ServerShutdownOp : public Op { bool IsFinalOp() { return false; } + void OnComplete() { + } grpc_server *server; @@ -104,6 +106,7 @@ NAN_METHOD(ServerShutdownCallback) { } void Server::ShutdownServer() { + Nan::HandleScope scope; if (this->wrapped_server != NULL) { if (shutdown_callback == NULL) { Localcallback_tpl = From 8b37153972c787b22b8ee2deb932a7744f92282b Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 11 Apr 2017 11:39:32 -0700 Subject: [PATCH 10/15] Move ForceShutdown completion handling to new OnComplete method --- ext/server_uv.cc | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/ext/server_uv.cc b/ext/server_uv.cc index bf402e19..6a31b892 100644 --- a/ext/server_uv.cc +++ b/ext/server_uv.cc @@ -67,7 +67,7 @@ class ServerShutdownOp : public Op { } Local GetNodeValue() const { - return Nan::New(reinterpret_cast(server)); + return Nan::Null(); } bool ParseOp(Local value, grpc_op *out) { @@ -77,6 +77,7 @@ class ServerShutdownOp : public Op { return false; } void OnComplete() { + grpc_server_destroy(server); } grpc_server *server; @@ -96,13 +97,6 @@ NAN_METHOD(ServerShutdownCallback) { if (!info[0]->IsNull()) { return Nan::ThrowError("forceShutdown failed somehow"); } - MaybeLocal maybe_result = Nan::To(info[1]); - Local result = maybe_result.ToLocalChecked(); - Local server_val = Nan::Get( - result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked(); - Local server_extern = server_val.As(); - grpc_server *server = reinterpret_cast(server_extern->Value()); - grpc_server_destroy(server); } void Server::ShutdownServer() { From 902527e52945fea8fa4e962cc92e4ad438cef8dd Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 11 Apr 2017 12:13:08 -0700 Subject: [PATCH 11/15] Node server: add NULL check to tryShutdown --- ext/server.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ext/server.cc b/ext/server.cc index 22c89c58..2d017dd0 100644 --- a/ext/server.cc +++ b/ext/server.cc @@ -277,6 +277,12 @@ NAN_METHOD(Server::TryShutdown) { return Nan::ThrowTypeError("tryShutdown can only be called on a Server"); } Server *server = ObjectWrap::Unwrap(info.This()); + if (server->wrapped_server == NULL) { + // Server is already shut down. Call callback immediately. + Nan::Callback callback(info[0].As()); + callback.Call(0, {}); + return; + } TryShutdownOp *op = new TryShutdownOp(server, info.This()); unique_ptr ops(new OpVec()); ops->push_back(unique_ptr(op)); From 8b7472fbec7c5d05106cd345569a9e75928eb5d3 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 11 Apr 2017 14:34:04 -0700 Subject: [PATCH 12/15] Only delete core-level server if shutdown was successful --- ext/call.cc | 19 ++++++++++--------- ext/call.h | 2 +- ext/server.cc | 8 +++++--- ext/server_uv.cc | 4 +++- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/ext/call.cc b/ext/call.cc index bb11cfb8..bd60775a 100644 --- a/ext/call.cc +++ b/ext/call.cc @@ -217,7 +217,7 @@ class SendMetadataOp : public Op { bool IsFinalOp() { return false; } - void OnComplete() { + void OnComplete(bool success) { } protected: std::string GetTypeString() const { @@ -262,7 +262,7 @@ class SendMessageOp : public Op { bool IsFinalOp() { return false; } - void OnComplete() { + void OnComplete(bool success) { } protected: std::string GetTypeString() const { @@ -284,7 +284,7 @@ class SendClientCloseOp : public Op { bool IsFinalOp() { return false; } - void OnComplete() { + void OnComplete(bool success) { } protected: std::string GetTypeString() const { @@ -355,7 +355,7 @@ class SendServerStatusOp : public Op { bool IsFinalOp() { return true; } - void OnComplete() { + void OnComplete(bool success) { } protected: std::string GetTypeString() const { @@ -389,7 +389,7 @@ class GetMetadataOp : public Op { bool IsFinalOp() { return false; } - void OnComplete() { + void OnComplete(bool success) { } protected: @@ -423,7 +423,7 @@ class ReadMessageOp : public Op { bool IsFinalOp() { return false; } - void OnComplete() { + void OnComplete(bool success) { } protected: @@ -466,7 +466,7 @@ class ClientStatusOp : public Op { bool IsFinalOp() { return true; } - void OnComplete() { + void OnComplete(bool success) { } protected: std::string GetTypeString() const { @@ -492,7 +492,7 @@ class ServerCloseResponseOp : public Op { bool IsFinalOp() { return false; } - void OnComplete() { + void OnComplete(bool success) { } protected: @@ -532,11 +532,12 @@ void CompleteTag(void *tag, const char *error_message) { Local argv[] = {Nan::Error(error_message)}; callback->Call(1, argv); } + bool success = (error_message == NULL); bool is_final_op = false; for (vector >::iterator it = tag_struct->ops->begin(); it != tag_struct->ops->end(); ++it) { Op *op_ptr = it->get(); - op_ptr->OnComplete(); + op_ptr->OnComplete(success); if (op_ptr->IsFinalOp()) { is_final_op = true; } diff --git a/ext/call.h b/ext/call.h index b38f5006..340e3268 100644 --- a/ext/call.h +++ b/ext/call.h @@ -106,7 +106,7 @@ class Op { virtual ~Op(); v8::Local GetOpType() const; virtual bool IsFinalOp() = 0; - virtual void OnComplete() = 0; + virtual void OnComplete(bool success) = 0; protected: virtual std::string GetTypeString() const = 0; diff --git a/ext/server.cc b/ext/server.cc index 2d017dd0..53843056 100644 --- a/ext/server.cc +++ b/ext/server.cc @@ -117,7 +117,7 @@ class NewCallOp : public Op { bool IsFinalOp() { return false; } - void OnComplete() { + void OnComplete(bool success) { } grpc_call *call; @@ -143,8 +143,10 @@ class TryShutdownOp: public Op { bool IsFinalOp() { return false; } - void OnComplete() { - server->DestroyWrappedServer(); + void OnComplete(bool success) { + if (success) { + server->DestroyWrappedServer(); + } } protected: std::string GetTypeString() const { return "try_shutdown"; } diff --git a/ext/server_uv.cc b/ext/server_uv.cc index 6a31b892..78993831 100644 --- a/ext/server_uv.cc +++ b/ext/server_uv.cc @@ -76,7 +76,9 @@ class ServerShutdownOp : public Op { bool IsFinalOp() { return false; } - void OnComplete() { + void OnComplete(bool success) { + /* Because cancel_all_calls was called, we assume that shutdown_and_notify + completes successfully */ grpc_server_destroy(server); } From ecf89f18d8d1e32bcc74eda47dd74256e5f467cc Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 12 Apr 2017 10:23:35 -0700 Subject: [PATCH 13/15] Bump version to 1.2.4 --- health_check/package.json | 4 ++-- tools/package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/health_check/package.json b/health_check/package.json index 5ad59e27..fb1ad777 100644 --- a/health_check/package.json +++ b/health_check/package.json @@ -1,6 +1,6 @@ { "name": "grpc-health-check", - "version": "1.2.3", + "version": "1.2.4", "author": "Google Inc.", "description": "Health check service for use with gRPC", "repository": { @@ -15,7 +15,7 @@ } ], "dependencies": { - "grpc": "^1.2.3", + "grpc": "^1.2.4", "lodash": "^3.9.3", "google-protobuf": "^3.0.0" }, diff --git a/tools/package.json b/tools/package.json index e06775b5..b552c429 100644 --- a/tools/package.json +++ b/tools/package.json @@ -1,6 +1,6 @@ { "name": "grpc-tools", - "version": "1.2.3", + "version": "1.2.4", "author": "Google Inc.", "description": "Tools for developing with gRPC on Node.js", "homepage": "http://www.grpc.io/", From 5706b9c014131b87de924fcb0c6d6a2792932a97 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 13 Apr 2017 16:20:56 -0700 Subject: [PATCH 14/15] 1.3.x branch cut --- health_check/package.json | 4 ++-- tools/package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/health_check/package.json b/health_check/package.json index e218f5a4..3d331182 100644 --- a/health_check/package.json +++ b/health_check/package.json @@ -1,6 +1,6 @@ { "name": "grpc-health-check", - "version": "1.3.0-dev", + "version": "1.3.0-pre1", "author": "Google Inc.", "description": "Health check service for use with gRPC", "repository": { @@ -15,7 +15,7 @@ } ], "dependencies": { - "grpc": "^1.3.0-dev", + "grpc": "^1.3.0-pre1", "lodash": "^3.9.3", "google-protobuf": "^3.0.0" }, diff --git a/tools/package.json b/tools/package.json index 3096c6e4..056f28ab 100644 --- a/tools/package.json +++ b/tools/package.json @@ -1,6 +1,6 @@ { "name": "grpc-tools", - "version": "1.3.0-dev", + "version": "1.3.0-pre1", "author": "Google Inc.", "description": "Tools for developing with gRPC on Node.js", "homepage": "http://www.grpc.io/", From 6c175a3d44d16f35fc9000e328dd2ddc6c77408a Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 13 Apr 2017 16:30:15 -0700 Subject: [PATCH 15/15] master to 1.4.0-dev --- health_check/package.json | 4 ++-- tools/package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/health_check/package.json b/health_check/package.json index e218f5a4..37c9b7a5 100644 --- a/health_check/package.json +++ b/health_check/package.json @@ -1,6 +1,6 @@ { "name": "grpc-health-check", - "version": "1.3.0-dev", + "version": "1.4.0-dev", "author": "Google Inc.", "description": "Health check service for use with gRPC", "repository": { @@ -15,7 +15,7 @@ } ], "dependencies": { - "grpc": "^1.3.0-dev", + "grpc": "^1.4.0-dev", "lodash": "^3.9.3", "google-protobuf": "^3.0.0" }, diff --git a/tools/package.json b/tools/package.json index 3096c6e4..a81aa87f 100644 --- a/tools/package.json +++ b/tools/package.json @@ -1,6 +1,6 @@ { "name": "grpc-tools", - "version": "1.3.0-dev", + "version": "1.4.0-dev", "author": "Google Inc.", "description": "Tools for developing with gRPC on Node.js", "homepage": "http://www.grpc.io/",