Merge pull request #43 from murgatroid99/idle_process_connectivity_state

Add forced completion queue poll to ensure that waitForReady uses current connectivity state
This commit is contained in:
Michael Lumish 2017-09-25 14:48:41 -07:00 committed by GitHub
commit 6376422345
4 changed files with 27 additions and 5 deletions

View File

@ -35,7 +35,7 @@ grpc_completion_queue *queue;
uv_prepare_t prepare;
int pending_batches;
void drain_completion_queue(uv_prepare_t *handle) {
static void drain_completion_queue(uv_prepare_t *handle) {
Nan::HandleScope scope;
grpc_event event;
(void)handle;
@ -53,9 +53,9 @@ void drain_completion_queue(uv_prepare_t *handle) {
CompleteTag(event.tag, error_message);
grpc::node::DestroyTag(event.tag);
pending_batches--;
if (pending_batches == 0) {
uv_prepare_stop(&prepare);
}
}
if (pending_batches == 0) {
uv_prepare_stop(&prepare);
}
} while (event.type != GRPC_QUEUE_TIMEOUT);
}
@ -76,5 +76,15 @@ void CompletionQueueInit(Local<Object> exports) {
pending_batches = 0;
}
void CompletionQueueForcePoll() {
/* This sets the prepare object to poll on the completion queue the next time
* Node polls for IO. But it doesn't increment the number of pending batches,
* so it will immediately stop polling after that unless there is an
* intervening CompletionQueueNext call */
if (pending_batches == 0) {
uv_prepare_start(&prepare, drain_completion_queue);
}
}
} // namespace node
} // namespace grpc

View File

@ -28,5 +28,7 @@ void CompletionQueueNext();
void CompletionQueueInit(v8::Local<v8::Object> exports);
void CompletionQueueForcePoll();
} // namespace node
} // namespace grpc

View File

@ -265,6 +265,10 @@ NAN_METHOD(SetLogVerbosity) {
gpr_set_log_verbosity(severity);
}
NAN_METHOD(ForcePoll) {
grpc::node::CompletionQueueForcePoll();
}
void init(Local<Object> exports) {
Nan::HandleScope scope;
grpc_init();
@ -306,6 +310,9 @@ void init(Local<Object> exports) {
Nan::Set(exports, Nan::New("setLogVerbosity").ToLocalChecked(),
Nan::GetFunction(Nan::New<FunctionTemplate>(SetLogVerbosity))
.ToLocalChecked());
Nan::Set(exports, Nan::New("forcePoll").ToLocalChecked(),
Nan::GetFunction(Nan::New<FunctionTemplate>(ForcePoll))
.ToLocalChecked());
}
NODE_MODULE(grpc_node, init)

View File

@ -800,7 +800,10 @@ Client.prototype.waitForReady = function(deadline, callback) {
self.$channel.watchConnectivityState(new_state, deadline, checkState);
}
};
checkState();
/* Force a single round of polling to ensure that the channel state is up
* to date */
grpc.forcePoll();
setImmediate(checkState);
};
/**