From 661bdfaa5dee09a1bb403430b70eacfc08eae791 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 19 Sep 2017 13:25:12 -0700 Subject: [PATCH 1/2] Add forced completion queue poll to ensure that waitForReady uses current connectivity state --- .../grpc-native-core/ext/completion_queue.cc | 18 ++++++++++++++---- packages/grpc-native-core/ext/node_grpc.cc | 7 +++++++ packages/grpc-native-core/src/client.js | 5 ++++- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/packages/grpc-native-core/ext/completion_queue.cc b/packages/grpc-native-core/ext/completion_queue.cc index a08febbb..0cee1a75 100644 --- a/packages/grpc-native-core/ext/completion_queue.cc +++ b/packages/grpc-native-core/ext/completion_queue.cc @@ -35,7 +35,7 @@ grpc_completion_queue *queue; uv_prepare_t prepare; int pending_batches; -void drain_completion_queue(uv_prepare_t *handle) { +static void drain_completion_queue(uv_prepare_t *handle) { Nan::HandleScope scope; grpc_event event; (void)handle; @@ -53,9 +53,9 @@ void drain_completion_queue(uv_prepare_t *handle) { CompleteTag(event.tag, error_message); grpc::node::DestroyTag(event.tag); pending_batches--; - if (pending_batches == 0) { - uv_prepare_stop(&prepare); - } + } + if (pending_batches == 0) { + uv_prepare_stop(&prepare); } } while (event.type != GRPC_QUEUE_TIMEOUT); } @@ -76,5 +76,15 @@ void CompletionQueueInit(Local exports) { pending_batches = 0; } +void CompletionQueueForcePoll() { + /* This sets the prepare object to poll on the completion queue the next time + * Node polls for IO. But it doesn't increment the number of pending batches, + * so it will immediately stop polling after that unless there is an + * intervening CompletionQueueNext call */ + if (pending_batches == 0) { + uv_prepare_start(&prepare, drain_completion_queue); + } +} + } // namespace node } // namespace grpc diff --git a/packages/grpc-native-core/ext/node_grpc.cc b/packages/grpc-native-core/ext/node_grpc.cc index 11ed0838..a0652c28 100644 --- a/packages/grpc-native-core/ext/node_grpc.cc +++ b/packages/grpc-native-core/ext/node_grpc.cc @@ -265,6 +265,10 @@ NAN_METHOD(SetLogVerbosity) { gpr_set_log_verbosity(severity); } +NAN_METHOD(ForcePoll) { + grpc::node::CompletionQueueForcePoll(); +} + void init(Local exports) { Nan::HandleScope scope; grpc_init(); @@ -306,6 +310,9 @@ void init(Local exports) { Nan::Set(exports, Nan::New("setLogVerbosity").ToLocalChecked(), Nan::GetFunction(Nan::New(SetLogVerbosity)) .ToLocalChecked()); + Nan::Set(exports, Nan::New("forcePoll").ToLocalChecked(), + Nan::GetFunction(Nan::New(ForcePoll)) + .ToLocalChecked()); } NODE_MODULE(grpc_node, init) diff --git a/packages/grpc-native-core/src/client.js b/packages/grpc-native-core/src/client.js index 4208da11..d3c39ca4 100644 --- a/packages/grpc-native-core/src/client.js +++ b/packages/grpc-native-core/src/client.js @@ -800,7 +800,10 @@ Client.prototype.waitForReady = function(deadline, callback) { self.$channel.watchConnectivityState(new_state, deadline, checkState); } }; - checkState(); + /* Force a single round of polling to ensure that the channel state is up + * to date */ + grpc.forcePoll(); + setImmediate(checkState); }; /** From 46e2418fc661eb2d58272c995d8579ec67accf69 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 19 Sep 2017 14:38:27 -0700 Subject: [PATCH 2/2] Update completion queue header to match code changes --- packages/grpc-native-core/ext/completion_queue.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/grpc-native-core/ext/completion_queue.h b/packages/grpc-native-core/ext/completion_queue.h index f91d5ea8..3a56c327 100644 --- a/packages/grpc-native-core/ext/completion_queue.h +++ b/packages/grpc-native-core/ext/completion_queue.h @@ -28,5 +28,7 @@ void CompletionQueueNext(); void CompletionQueueInit(v8::Local exports); +void CompletionQueueForcePoll(); + } // namespace node } // namespace grpc